Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion code-interpreter/app/services/executor_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
EntryKind,
ExecutionResult,
HealthCheck,
StreamChunk,
StreamEvent,
StreamResult,
WorkspaceEntry,
wrap_last_line_interactive,
)
Expand Down Expand Up @@ -61,7 +64,7 @@ class _KubeExecContext:
"""Holds the live pod and exec stream for the duration of an execution."""

pod_name: str
exec_resp: Any # kubernetes WSClient
exec_resp: ws_client.WSClient
start: float


Expand Down Expand Up @@ -552,6 +555,52 @@ def execute_python(
files=workspace_snapshot,
)

def execute_python_streaming(
self,
*,
code: str,
stdin: str | None,
timeout_ms: int,
max_output_bytes: int,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
files: Sequence[tuple[str, bytes]] | None = None,
last_line_interactive: bool = True,
) -> Generator[StreamEvent, None, None]:
"""Execute Python code and yield output chunks as they arrive.

Yields StreamChunk events during execution, then a single StreamResult
at the end containing exit_code, timing, and workspace files.
"""
with self._run_in_pod(
code=code,
cpu_time_limit_sec=cpu_time_limit_sec,
memory_limit_mb=memory_limit_mb,
files=files,
last_line_interactive=last_line_interactive,
) as ctx:
if stdin:
logger.debug("Writing stdin to Python process")
ctx.exec_resp.write_stdin(stdin)

deadline = time.time() + (timeout_ms / 1000.0)
exit_code, timed_out = yield from _stream_kube_output(
ctx.exec_resp, deadline, max_output_bytes
)

if timed_out:
self._kill_python_process(ctx.pod_name)

workspace_snapshot = self._extract_workspace_snapshot(ctx.pod_name)

duration_ms = int((time.perf_counter() - ctx.start) * 1000)
yield StreamResult(
exit_code=exit_code if not timed_out else None,
timed_out=timed_out,
duration_ms=duration_ms,
files=workspace_snapshot,
)

def _validate_relative_path(self, path_str: str) -> Path:
path = Path(path_str)
if path.is_absolute():
Expand All @@ -569,3 +618,56 @@ def _validate_relative_path(self, path_str: str) -> Path:
raise ValueError("File path must not be empty.")

return Path(*sanitized_parts)


def _stream_kube_output(
exec_resp: ws_client.WSClient,
deadline: float,
max_output_bytes: int,
) -> Generator[StreamChunk, None, tuple[int | None, bool]]:
"""Read stdout/stderr from a Kubernetes exec stream and yield StreamChunk events.

Returns a (exit_code, timed_out) tuple.
"""
stdout_bytes = 0
stderr_bytes = 0
exit_code: int | None = None
timed_out = False

while exec_resp.is_open():
remaining = deadline - time.time()
if remaining <= 0:
timed_out = True
break

exec_resp.update(timeout=min(remaining, 1))

if exec_resp.peek_stdout():
text: str = exec_resp.read_stdout()
raw = text.encode("utf-8")
if stdout_bytes < max_output_bytes:
allowed = max_output_bytes - stdout_bytes
if len(raw) > allowed:
text = raw[:allowed].decode("utf-8", errors="ignore")
if text:
yield StreamChunk(stream="stdout", data=text)
stdout_bytes += len(raw)

if exec_resp.peek_stderr():
text = exec_resp.read_stderr()
raw = text.encode("utf-8")
if stderr_bytes < max_output_bytes:
allowed = max_output_bytes - stderr_bytes
if len(raw) > allowed:
text = raw[:allowed].decode("utf-8", errors="ignore")
if text:
yield StreamChunk(stream="stderr", data=text)
stderr_bytes += len(raw)

error: str = exec_resp.read_channel(ws_client.ERROR_CHANNEL)
if error:
exit_code = _parse_exit_code(error)
break

exec_resp.close()
return exit_code, timed_out
Loading