From dd739431e60ec1c2c95ecb789a12d7623568fb44 Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Sun, 22 Feb 2026 14:41:00 -0800 Subject: [PATCH 1/4] 1 --- .../app/services/executor_kubernetes.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 15957e7..99ea677 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -551,6 +551,52 @@ def execute_python( duration_ms=duration_ms, files=workspace_snapshot, ) + + def execute_python_streaming( + self, + *, + code: str, + stdin: str | None, + timeout_ms: int, + max_output_bytes: int, + cpu_time_limit_sec: int | None = None, + memory_limit_mb: int | None = None, + files: Sequence[tuple[str, bytes]] | None = None, + last_line_interactive: bool = True, + ) -> Generator[StreamEvent, None, None]: + """Execute Python code and yield output chunks as they arrive. + + Yields StreamChunk events during execution, then a single StreamResult + at the end containing exit_code, timing, and workspace files. + """ + with self._run_in_pod( + code=code, + cpu_time_limit_sec=cpu_time_limit_sec, + memory_limit_mb=memory_limit_mb, + files=files, + last_line_interactive=last_line_interactive, + ) as ctx: + if stdin: + logger.debug("Writing stdin to Python process") + ctx.exec_resp.write_stdin(stdin) + + deadline = time.time() + (timeout_ms / 1000.0) + exit_code, timed_out = yield from _stream_kube_output( + ctx.exec_resp, deadline, max_output_bytes + ) + + if timed_out: + self._kill_python_process(ctx.pod_name) + + workspace_snapshot = self._extract_workspace_snapshot(ctx.pod_name) + + duration_ms = int((time.perf_counter() - ctx.start) * 1000) + yield StreamResult( + exit_code=exit_code if not timed_out else None, + timed_out=timed_out, + duration_ms=duration_ms, + files=workspace_snapshot, + ) def _validate_relative_path(self, path_str: str) -> Path: path = Path(path_str) From ba32083e85176de18f2be7710f6ea80d3eadfeda Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Sun, 22 Feb 2026 14:43:54 -0800 Subject: [PATCH 2/4] . --- .../app/services/executor_kubernetes.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 99ea677..980178e 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -32,6 +32,9 @@ EntryKind, ExecutionResult, HealthCheck, + StreamEvent, + StreamResult, + StreamChunk, WorkspaceEntry, wrap_last_line_interactive, ) @@ -615,3 +618,56 @@ def _validate_relative_path(self, path_str: str) -> Path: raise ValueError("File path must not be empty.") return Path(*sanitized_parts) + + +def _stream_kube_output( + exec_resp: Any, + deadline: float, + max_output_bytes: int, +) -> Generator[StreamChunk, None, tuple[int | None, bool]]: + """Read stdout/stderr from a Kubernetes exec stream and yield StreamChunk events. + + Returns a (exit_code, timed_out) tuple. + """ + stdout_bytes = 0 + stderr_bytes = 0 + exit_code: int | None = None + timed_out = False + + while exec_resp.is_open(): + remaining = deadline - time.time() + if remaining <= 0: + timed_out = True + break + + exec_resp.update(timeout=min(remaining, 1)) + + if exec_resp.peek_stdout(): + text: str = exec_resp.read_stdout() + raw = text.encode("utf-8") + if stdout_bytes < max_output_bytes: + allowed = max_output_bytes - stdout_bytes + if len(raw) > allowed: + text = raw[:allowed].decode("utf-8", errors="ignore") + if text: + yield StreamChunk(stream="stdout", data=text) + stdout_bytes += len(raw) + + if exec_resp.peek_stderr(): + text = exec_resp.read_stderr() + raw = text.encode("utf-8") + if stderr_bytes < max_output_bytes: + allowed = max_output_bytes - stderr_bytes + if len(raw) > allowed: + text = raw[:allowed].decode("utf-8", errors="ignore") + if text: + yield StreamChunk(stream="stderr", data=text) + stderr_bytes += len(raw) + + error: str = exec_resp.read_channel(ws_client.ERROR_CHANNEL) + if error: + exit_code = _parse_exit_code(error) + break + + exec_resp.close() + return exit_code, timed_out From 8b5d58e0767bd1a9db1af7bcf3f3844e86bc9925 Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Sun, 22 Feb 2026 15:07:19 -0800 Subject: [PATCH 3/4] add stuff --- .../app/services/executor_kubernetes.py | 2 +- .../test_kubernetes_streaming.py | 346 ++++++++++++++++++ 2 files changed, 347 insertions(+), 1 deletion(-) create mode 100644 code-interpreter/tests/integration_tests/test_kubernetes_streaming.py diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 980178e..28c1d33 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -554,7 +554,7 @@ def execute_python( duration_ms=duration_ms, files=workspace_snapshot, ) - + def execute_python_streaming( self, *, diff --git a/code-interpreter/tests/integration_tests/test_kubernetes_streaming.py b/code-interpreter/tests/integration_tests/test_kubernetes_streaming.py new file mode 100644 index 0000000..1911047 --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_kubernetes_streaming.py @@ -0,0 +1,346 @@ +"""Tests for KubernetesExecutor.execute_python_streaming. + +Unit tests that mock the Kubernetes API to exercise the streaming +execution path without requiring a real cluster. +""" + +from __future__ import annotations + +import base64 +import io +import tarfile +from typing import Any +from unittest.mock import MagicMock, patch + +import pytest + +from app.services.executor_base import StreamChunk, StreamEvent, StreamResult +from app.services.executor_kubernetes import KubernetesExecutor + +# --------------------------------------------------------------------------- +# Fixtures & helpers +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def executor() -> KubernetesExecutor: + """Create a KubernetesExecutor bypassing __init__ (no cluster needed).""" + inst = KubernetesExecutor.__new__(KubernetesExecutor) + inst.v1 = MagicMock() + inst.namespace = "test" + inst.image = "test:latest" + inst.service_account = "" + pod_mock = MagicMock() + pod_mock.status.phase = "Running" + inst.v1.read_namespaced_pod.return_value = pod_mock + return inst + + +class FakeExecResp: + """Simulates a Kubernetes WebSocket exec stream. + + Delivers *stdout_chunks* and *stderr_chunks* one per loop iteration. + Once both queues are drained, the *exit_status* is returned on the + error channel, causing the reader to break out of the loop. + """ + + def __init__( + self, + stdout_chunks: list[str] | None = None, + stderr_chunks: list[str] | None = None, + exit_status: str = "{'status': 'Success'}", + ) -> None: + self._stdout = list(stdout_chunks or []) + self._stderr = list(stderr_chunks or []) + self._exit_status = exit_status + self._closed = False + self._exit_delivered = False + self.stdin_writes: list[Any] = [] + + def is_open(self) -> bool: + if self._closed: + return False + return bool(self._stdout or self._stderr or not self._exit_delivered) + + def update(self, timeout: float = 1) -> None: # noqa: ARG002 + pass + + def peek_stdout(self) -> bool: + return bool(self._stdout) + + def read_stdout(self) -> str: + return self._stdout.pop(0) + + def peek_stderr(self) -> bool: + return bool(self._stderr) + + def read_stderr(self) -> str: + return self._stderr.pop(0) + + def read_channel(self, channel: int) -> str: # noqa: ARG002 + if not self._stdout and not self._stderr and not self._exit_delivered: + self._exit_delivered = True + return self._exit_status + return "" + + def write_stdin(self, data: str | bytes) -> None: + self.stdin_writes.append(data) + + def close(self) -> None: + self._closed = True + + +def _make_tar_mock() -> MagicMock: + """Mock for the tar-upload exec stream (succeeds immediately).""" + resp = MagicMock() + resp.is_open.side_effect = [True, False] + resp.peek_stdout.return_value = False + resp.peek_stderr.return_value = False + resp.read_channel.return_value = "{'status': 'Success'}" + return resp + + +def _make_snapshot_mock(files: dict[str, bytes] | None = None) -> MagicMock: + """Mock for workspace-snapshot exec stream.""" + resp = MagicMock() + if not files: + resp.is_open.side_effect = [False] + resp.peek_stdout.return_value = False + resp.peek_stderr.return_value = False + return resp + + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w") as tar: + for name, content in files.items(): + info = tarfile.TarInfo(name=name) + info.size = len(content) + tar.addfile(info, io.BytesIO(content)) + b64 = base64.b64encode(buf.getvalue()).decode("ascii") + + resp.is_open.side_effect = [True, False] + resp.peek_stdout.side_effect = [True, False] + resp.read_stdout.return_value = b64 + resp.peek_stderr.return_value = False + return resp + + +def _run_streaming( + executor: KubernetesExecutor, + exec_resp: FakeExecResp, + *, + extra_stream_mocks: list[Any] | None = None, + snapshot_files: dict[str, bytes] | None = None, + **kwargs: object, +) -> list[StreamEvent]: + """Run execute_python_streaming with mocked Kubernetes streams.""" + mocks: list[Any] = [_make_tar_mock(), exec_resp] + if extra_stream_mocks: + mocks.extend(extra_stream_mocks) + mocks.append(_make_snapshot_mock(snapshot_files)) + + defaults: dict[str, Any] = { + "code": "print('hello')", + "stdin": None, + "timeout_ms": 5000, + "max_output_bytes": 65536, + } + defaults.update(kwargs) + + with patch("app.services.executor_kubernetes.stream.stream") as mock_stream: + mock_stream.side_effect = mocks + return list(executor.execute_python_streaming(**defaults)) + + +def _chunks(events: list[StreamEvent]) -> list[StreamChunk]: + return [e for e in events if isinstance(e, StreamChunk)] + + +def _result(events: list[StreamEvent]) -> StreamResult: + results = [e for e in events if isinstance(e, StreamResult)] + assert len(results) == 1, f"Expected exactly 1 StreamResult, got {len(results)}" + return results[0] + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +def test_streaming_yields_stdout_chunks(executor: KubernetesExecutor) -> None: + events = _run_streaming(executor, FakeExecResp(stdout_chunks=["hello\n"])) + + chunks = _chunks(events) + assert len(chunks) == 1 + assert chunks[0] == StreamChunk(stream="stdout", data="hello\n") + + result = _result(events) + assert result.exit_code == 0 + assert result.timed_out is False + assert result.duration_ms >= 0 + + +def test_streaming_yields_stderr_chunks(executor: KubernetesExecutor) -> None: + events = _run_streaming(executor, FakeExecResp(stderr_chunks=["oops\n"])) + + chunks = _chunks(events) + assert len(chunks) == 1 + assert chunks[0] == StreamChunk(stream="stderr", data="oops\n") + + +def test_streaming_multiple_stdout_chunks(executor: KubernetesExecutor) -> None: + events = _run_streaming(executor, FakeExecResp(stdout_chunks=["line1\n", "line2\n"])) + + chunks = _chunks(events) + assert len(chunks) == 2 + assert chunks[0].data == "line1\n" + assert chunks[1].data == "line2\n" + assert all(c.stream == "stdout" for c in chunks) + + +def test_streaming_mixed_stdout_and_stderr(executor: KubernetesExecutor) -> None: + events = _run_streaming( + executor, FakeExecResp(stdout_chunks=["out\n"], stderr_chunks=["err\n"]) + ) + + chunks = _chunks(events) + stdout = [c for c in chunks if c.stream == "stdout"] + stderr = [c for c in chunks if c.stream == "stderr"] + + assert len(stdout) == 1 + assert stdout[0].data == "out\n" + assert len(stderr) == 1 + assert stderr[0].data == "err\n" + + +def test_streaming_nonzero_exit_code(executor: KubernetesExecutor) -> None: + events = _run_streaming( + executor, + FakeExecResp( + stderr_chunks=["error!\n"], + exit_status="{'status': 'Failure', 'details': {'exitCode': 1}}", + ), + ) + + result = _result(events) + assert result.exit_code == 1 + assert result.timed_out is False + + +def test_streaming_timeout(executor: KubernetesExecutor) -> None: + """timeout_ms=0 guarantees immediate timeout.""" + events = _run_streaming( + executor, + FakeExecResp(), + extra_stream_mocks=[MagicMock()], # _kill_python_process + timeout_ms=0, + ) + + assert _chunks(events) == [] + result = _result(events) + assert result.exit_code is None + assert result.timed_out is True + + +def test_streaming_timeout_calls_kill(executor: KubernetesExecutor) -> None: + """Verify _kill_python_process is invoked on timeout.""" + exec_resp = FakeExecResp() + + with patch("app.services.executor_kubernetes.stream.stream") as mock_stream: + mock_stream.side_effect = [ + _make_tar_mock(), + exec_resp, + MagicMock(), # kill + _make_snapshot_mock(), + ] + list( + executor.execute_python_streaming( + code="import time; time.sleep(999)", + stdin=None, + timeout_ms=0, + max_output_bytes=65536, + ) + ) + + kill_calls = [ + c + for c in mock_stream.call_args_list + if c.kwargs.get("command") == ["pkill", "-9", "python"] + ] + assert len(kill_calls) == 1 + + +def test_streaming_truncates_stdout(executor: KubernetesExecutor) -> None: + """A single chunk exceeding the byte budget is truncated.""" + events = _run_streaming( + executor, + FakeExecResp(stdout_chunks=["hello world"]), + max_output_bytes=5, + ) + + chunks = _chunks(events) + assert len(chunks) == 1 + assert chunks[0].data == "hello" + assert chunks[0].stream == "stdout" + + +def test_streaming_suppresses_chunks_past_limit( + executor: KubernetesExecutor, +) -> None: + """Once the byte budget is exhausted, further chunks are not yielded.""" + events = _run_streaming( + executor, + FakeExecResp(stdout_chunks=["aaa", "bbb"]), + max_output_bytes=3, + ) + + chunks = _chunks(events) + assert len(chunks) == 1 + assert chunks[0].data == "aaa" + + +def test_streaming_forwards_stdin(executor: KubernetesExecutor) -> None: + exec_resp = FakeExecResp(stdout_chunks=["echoed\n"]) + _run_streaming(executor, exec_resp, stdin="input data") + + assert exec_resp.stdin_writes == ["input data"] + + +def test_streaming_includes_workspace_files( + executor: KubernetesExecutor, +) -> None: + events = _run_streaming( + executor, + FakeExecResp(), + snapshot_files={"output.txt": b"file content"}, + ) + + result = _result(events) + assert len(result.files) == 1 + assert result.files[0].path == "output.txt" + assert result.files[0].content == b"file content" + + +def test_streaming_cleans_up_pod(executor: KubernetesExecutor) -> None: + """Pod is deleted via _cleanup_pod regardless of outcome.""" + _run_streaming(executor, FakeExecResp()) + + executor.v1.delete_namespaced_pod.assert_called_once() + + +def test_streaming_empty_output(executor: KubernetesExecutor) -> None: + events = _run_streaming(executor, FakeExecResp()) + + assert _chunks(events) == [] + result = _result(events) + assert result.exit_code == 0 + assert result.timed_out is False + + +def test_streaming_always_ends_with_result( + executor: KubernetesExecutor, +) -> None: + """The last event yielded must always be a StreamResult.""" + events = _run_streaming(executor, FakeExecResp(stdout_chunks=["data\n"])) + + assert len(events) >= 1 + assert isinstance(events[-1], StreamResult) From 44b91df468ba38878ef6493a8e02458810c36386 Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Mon, 23 Feb 2026 17:19:20 -0800 Subject: [PATCH 4/4] . --- code-interpreter/app/services/executor_kubernetes.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 28c1d33..0f611cb 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -32,9 +32,9 @@ EntryKind, ExecutionResult, HealthCheck, + StreamChunk, StreamEvent, StreamResult, - StreamChunk, WorkspaceEntry, wrap_last_line_interactive, ) @@ -64,7 +64,7 @@ class _KubeExecContext: """Holds the live pod and exec stream for the duration of an execution.""" pod_name: str - exec_resp: Any # kubernetes WSClient + exec_resp: ws_client.WSClient start: float @@ -621,7 +621,7 @@ def _validate_relative_path(self, path_str: str) -> Path: def _stream_kube_output( - exec_resp: Any, + exec_resp: ws_client.WSClient, deadline: float, max_output_bytes: int, ) -> Generator[StreamChunk, None, tuple[int | None, bool]]: