Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions sdk/go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (c *Client) Create(ctx context.Context, opts CreateOptions) (*Sandbox, erro
return nil, err
}
c.attachSandbox(&sandbox)
if len(opts.EnvVars) > 0 {
if err := sandbox.initEnvVars(ctx, opts.EnvVars); err != nil {
return nil, err
}
}
return &sandbox, nil
}

Expand Down
67 changes: 67 additions & 0 deletions sdk/go/envd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"strconv"
Expand All @@ -24,8 +25,18 @@ const (
connectEndStreamFlag = byte(0x02)
connectCompressedFlag = byte(0x01)
maxConnectEnvelopeSize = 64 * 1024 * 1024

envdInitMaxAttempts = 5
envdInitRetryDelay = 800 * time.Millisecond
envdInitRetryJitter = 400 * time.Millisecond
envdInitReqTimeout = 10 * time.Second
)

type envdInitRequest struct {
EnvVars map[string]string `json:"envVars"`
Timestamp string `json:"timestamp"`
}

type processStartRequest struct {
Process processConfig `json:"process"`
Stdin *bool `json:"stdin,omitempty"`
Expand Down Expand Up @@ -83,6 +94,62 @@ type connectError struct {
Message string `json:"message,omitempty"`
}

func (s *Sandbox) initEnvVars(ctx context.Context, envVars map[string]string) error {
if err := s.ensureClient(); err != nil {
return err
}
if len(envVars) == 0 {
return nil
}

raw, err := json.Marshal(envdInitRequest{
EnvVars: envVars,
Timestamp: time.Now().UTC().Format(time.RFC3339),
})
if err != nil {
return err
}

var lastErr error
for attempt := 0; attempt < envdInitMaxAttempts; attempt++ {
if attempt > 0 {
delay := envdInitRetryDelay + time.Duration(rand.Int63n(int64(envdInitRetryJitter)))
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
}

reqCtx, cancel := context.WithTimeout(ctx, envdInitReqTimeout)
req, err := s.newEnvdRequest(reqCtx, http.MethodPost, "/init", nil, bytes.NewReader(raw))
if err != nil {
cancel()
return err
}
req.Header.Set("Content-Type", "application/json")

resp, err := s.client.dataHTTP.Do(req)
Comment thread
xiaojunxiang2023 marked this conversation as resolved.
cancel()
if err != nil {
lastErr = err
continue
}

statusOK := resp.StatusCode < http.StatusBadRequest
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if statusOK {
return nil
}
lastErr = fmt.Errorf("envd /init returned HTTP %d", resp.StatusCode)
}

return fmt.Errorf("failed to inject create env_vars into sandbox: %w", lastErr)
}

func (s *Sandbox) startProcess(ctx context.Context, payload processStartRequest, opts CommandOptions) (*processStartResult, error) {
if err := s.ensureClient(); err != nil {
return nil, err
Expand Down
64 changes: 64 additions & 0 deletions sdk/go/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ func TestNewConfigFromEnv(t *testing.T) {

func TestCreateSendsPythonCompatiblePayload(t *testing.T) {
var got map[string]any
var initPath string
var initHost string
var initBody map[string]any

dataServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/init" {
t.Fatalf("data request = %s %s", r.Method, r.URL.Path)
}
initPath = r.URL.Path
initHost = r.Host
if err := json.NewDecoder(r.Body).Decode(&initBody); err != nil {
t.Fatalf("decode init body: %v", err)
}
w.WriteHeader(http.StatusOK)
}))
defer dataServer.Close()

dataHost, dataPort := serverHostPort(t, dataServer.URL)

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost || r.URL.Path != "/sandboxes" {
t.Fatalf("request = %s %s", r.Method, r.URL.Path)
Expand All @@ -100,6 +119,8 @@ func TestCreateSendsPythonCompatiblePayload(t *testing.T) {
Timeout: 300 * time.Second,
RequestTimeout: time.Second,
SandboxDomain: "cube.app",
ProxyNodeIP: dataHost,
ProxyPortHTTP: dataPort,
})

sb, err := client.Create(context.Background(), CreateOptions{
Expand Down Expand Up @@ -136,6 +157,49 @@ func TestCreateSendsPythonCompatiblePayload(t *testing.T) {
if _, ok := got["mcp"].(map[string]any); !ok {
t.Fatalf("extra field not preserved: %#v", got["mcp"])
}

if initPath != "/init" {
t.Fatalf("init path=%q", initPath)
}
if initHost != fmt.Sprintf("%d-%s.cube.app", JupyterPort, testSandboxID) {
t.Fatalf("init Host=%q", initHost)
}
assertMapString(t, initBody["envVars"], "FOO", "bar")
if _, ok := initBody["timestamp"].(string); !ok {
t.Fatalf("init timestamp missing: %#v", initBody["timestamp"])
}
}

func TestCreateWithoutEnvVarsSkipsInit(t *testing.T) {
dataCalled := false
dataServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
dataCalled = true
w.WriteHeader(http.StatusOK)
}))
defer dataServer.Close()

dataHost, dataPort := serverHostPort(t, dataServer.URL)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusCreated)
fmt.Fprint(w, sandboxJSON(testSandboxID, "tpl-no-init"))
}))
defer server.Close()

client := NewClient(Config{
APIURL: server.URL,
TemplateID: "tpl-no-init",
Timeout: 300 * time.Second,
RequestTimeout: time.Second,
ProxyNodeIP: dataHost,
ProxyPortHTTP: dataPort,
})
if _, err := client.Create(context.Background(), CreateOptions{}); err != nil {
t.Fatalf("Create returned error: %v", err)
}
if dataCalled {
t.Fatal("data plane was called without env_vars")
}
}

func TestCreateOmitsOptionalFieldsAndRequiresTemplate(t *testing.T) {
Expand Down
75 changes: 73 additions & 2 deletions sdk/python/cubesandbox/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

from __future__ import annotations

import random
import time
from datetime import datetime, timezone
from typing import Any, Callable, Dict

import httpx
import requests

from ._commands import CommandResult, Commands
from ._commands import CommandResult, Commands, ENVD_PORT
from ._config import Config
from ._exceptions import ApiError, AuthenticationError, CubeSandboxError, SandboxNotFoundError, TemplateNotFoundError
from ._filesystem import Filesystem
Expand All @@ -18,6 +21,10 @@
from ._transport import build_client

JUPYTER_PORT = 49999
ENVD_INIT_MAX_ATTEMPTS = 5
ENVD_INIT_RETRY_BASE_SECS = 0.8
ENVD_INIT_RETRY_JITTER_SECS = 0.4
ENVD_INIT_REQ_TIMEOUT_SECS = 10.0


def _check_response(resp: requests.Response) -> None:
Expand Down Expand Up @@ -156,7 +163,10 @@ def create(
resp = s.post(f"{cfg.api_url}/sandboxes", json=payload,
headers={"Content-Type": "application/json"})
_check_response(resp)
return cls(resp.json(), config=cfg)
sandbox = cls(resp.json(), config=cfg)
if env_vars:
sandbox._init_env_vars(env_vars)
return sandbox

@classmethod
def connect(cls, sandbox_id: str, *, config: Config | None = None) -> "Sandbox":
Expand Down Expand Up @@ -667,3 +677,64 @@ def _build_session(self) -> requests.Session:
def _build_data_client(self) -> httpx.Client:
"""Build an HTTP client for CubeProxy-routed sandbox data-plane APIs."""
return build_client(self._config)

def _init_env_vars(self, env_vars: Dict[str, str]) -> None:
"""Make create-time env_vars visible to later commands.run / run_code.

The control plane only records env_vars as sandbox metadata; it never
loads them into the guest runtime. So once the sandbox is up we push
them into the guest via envd's native POST /init, reusing the exact
CubeProxy data-plane channel commands.run already uses. envd stores them
as global defaults and merges them into every later process execution,
giving the precedence ``template env < create env < per-command env``.

Routing through the configured data-plane client means no extra
deployment configuration is required: whatever address commands.run
reaches envd on, /init reaches it on too.
"""
if not env_vars:
return
if self._client is None:
self._client = self._build_data_client()
Comment thread
xiaojunxiang2023 marked this conversation as resolved.

headers = {}
access_token = self._data.get("envdAccessToken")
if access_token:
headers["X-Access-Token"] = access_token

url = f"http://{self.get_host(ENVD_PORT)}/init"
body = {
"envVars": env_vars,
"timestamp": datetime.now(timezone.utc).isoformat(),
}

# The proxy route to a freshly created sandbox may settle a moment after

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any clues on this?

# create returns, so retry briefly before surfacing a hard failure.
last_error: Exception | None = None
for attempt in range(ENVD_INIT_MAX_ATTEMPTS):
if attempt:
delay = ENVD_INIT_RETRY_BASE_SECS + random.uniform(
0, ENVD_INIT_RETRY_JITTER_SECS
)
time.sleep(delay)
try:
resp = self._client.post(
url,
json=body,
headers=headers,
timeout=ENVD_INIT_REQ_TIMEOUT_SECS,
)
try:
if resp.status_code < 400:
return
last_error = RuntimeError(
f"envd /init returned HTTP {resp.status_code}"
)
finally:
resp.close()
except BaseException as exc:
if isinstance(exc, (KeyboardInterrupt, SystemExit)):
raise
last_error = exc

raise CubeSandboxError(f"failed to inject create env_vars into sandbox: {last_error}")
23 changes: 22 additions & 1 deletion sdk/python/tests/test_sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,31 @@ def test_create_sends_template_and_timeout(self):
assert body["timeout"] == 600

def test_create_sends_env_vars(self):
with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)) as m:
captured: dict = {}

def handler(request: httpx.Request) -> httpx.Response:
captured["url"] = str(request.url)
captured["body"] = json.loads(request.content)
return httpx.Response(200)

client = httpx.Client(transport=httpx.MockTransport(handler))
with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)) as m, \
patch.object(Sandbox, "_build_data_client", return_value=client):
Sandbox.create(env_vars={"FOO": "bar"}, config=make_config())

body = m.call_args.kwargs["json"]
assert body["envVars"] == {"FOO": "bar"}
# SDK transparently pushes create-time env_vars into the guest via
# envd's native /init so later commands.run can read them.
assert captured["url"].endswith("/init")
assert captured["body"]["envVars"] == {"FOO": "bar"}

def test_create_without_env_vars_skips_init(self):
build = MagicMock()
with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)), \
patch.object(Sandbox, "_build_data_client", new=build):
Sandbox.create(config=make_config())
build.assert_not_called()

def test_create_sends_metadata(self):
meta = {"network-policy": "deny-all"}
Expand Down
Loading