From addc489908a6c820895ec6e3fddd91c979ade3e7 Mon Sep 17 00:00:00 2001 From: xiaojunxiang Date: Sat, 13 Jun 2026 19:56:07 +0800 Subject: [PATCH] feat(sdk): inject create-time env_vars into the guest via envd /init Sandbox.create(env_vars=...) values were recorded as sandbox metadata but never loaded into the guest runtime, so commands.run / run_code could not see them. After the sandbox is created, the Python and Go SDKs push them into the guest through envd's native POST /init, reusing the same CubeProxy data-plane channel commands.run already uses. Precedence: template env < create env < per-command env. No guest rootfs writes or extra deployment config. /init uses bounded request timeouts, jittered retries, and prompt response cleanup on both SDKs. Signed-off-by: xiaojunxiang --- sdk/go/client.go | 5 +++ sdk/go/envd.go | 67 +++++++++++++++++++++++++++ sdk/go/sdk_test.go | 64 ++++++++++++++++++++++++++ sdk/python/cubesandbox/sandbox.py | 75 ++++++++++++++++++++++++++++++- sdk/python/tests/test_sandbox.py | 23 +++++++++- 5 files changed, 231 insertions(+), 3 deletions(-) diff --git a/sdk/go/client.go b/sdk/go/client.go index 72ab498d..d49a3f71 100644 --- a/sdk/go/client.go +++ b/sdk/go/client.go @@ -74,6 +74,11 @@ func (c *Client) Create(ctx context.Context, opts CreateOptions) (*Sandbox, erro return nil, err } c.attachSandbox(&sandbox) + if len(opts.EnvVars) > 0 { + if err := sandbox.initEnvVars(ctx, opts.EnvVars); err != nil { + return nil, err + } + } return &sandbox, nil } diff --git a/sdk/go/envd.go b/sdk/go/envd.go index 4935ec55..89e26938 100644 --- a/sdk/go/envd.go +++ b/sdk/go/envd.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "net/url" "strconv" @@ -24,8 +25,18 @@ const ( connectEndStreamFlag = byte(0x02) connectCompressedFlag = byte(0x01) maxConnectEnvelopeSize = 64 * 1024 * 1024 + + envdInitMaxAttempts = 5 + envdInitRetryDelay = 800 * time.Millisecond + envdInitRetryJitter = 400 * time.Millisecond + envdInitReqTimeout = 10 * time.Second ) +type envdInitRequest struct { + EnvVars map[string]string `json:"envVars"` + Timestamp string `json:"timestamp"` +} + type processStartRequest struct { Process processConfig `json:"process"` Stdin *bool `json:"stdin,omitempty"` @@ -83,6 +94,62 @@ type connectError struct { Message string `json:"message,omitempty"` } +func (s *Sandbox) initEnvVars(ctx context.Context, envVars map[string]string) error { + if err := s.ensureClient(); err != nil { + return err + } + if len(envVars) == 0 { + return nil + } + + raw, err := json.Marshal(envdInitRequest{ + EnvVars: envVars, + Timestamp: time.Now().UTC().Format(time.RFC3339), + }) + if err != nil { + return err + } + + var lastErr error + for attempt := 0; attempt < envdInitMaxAttempts; attempt++ { + if attempt > 0 { + delay := envdInitRetryDelay + time.Duration(rand.Int63n(int64(envdInitRetryJitter))) + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } + + reqCtx, cancel := context.WithTimeout(ctx, envdInitReqTimeout) + req, err := s.newEnvdRequest(reqCtx, http.MethodPost, "/init", nil, bytes.NewReader(raw)) + if err != nil { + cancel() + return err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := s.client.dataHTTP.Do(req) + cancel() + if err != nil { + lastErr = err + continue + } + + statusOK := resp.StatusCode < http.StatusBadRequest + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if statusOK { + return nil + } + lastErr = fmt.Errorf("envd /init returned HTTP %d", resp.StatusCode) + } + + return fmt.Errorf("failed to inject create env_vars into sandbox: %w", lastErr) +} + func (s *Sandbox) startProcess(ctx context.Context, payload processStartRequest, opts CommandOptions) (*processStartResult, error) { if err := s.ensureClient(); err != nil { return nil, err diff --git a/sdk/go/sdk_test.go b/sdk/go/sdk_test.go index 07c4a426..481107dd 100644 --- a/sdk/go/sdk_test.go +++ b/sdk/go/sdk_test.go @@ -76,6 +76,25 @@ func TestNewConfigFromEnv(t *testing.T) { func TestCreateSendsPythonCompatiblePayload(t *testing.T) { var got map[string]any + var initPath string + var initHost string + var initBody map[string]any + + dataServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost || r.URL.Path != "/init" { + t.Fatalf("data request = %s %s", r.Method, r.URL.Path) + } + initPath = r.URL.Path + initHost = r.Host + if err := json.NewDecoder(r.Body).Decode(&initBody); err != nil { + t.Fatalf("decode init body: %v", err) + } + w.WriteHeader(http.StatusOK) + })) + defer dataServer.Close() + + dataHost, dataPort := serverHostPort(t, dataServer.URL) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost || r.URL.Path != "/sandboxes" { t.Fatalf("request = %s %s", r.Method, r.URL.Path) @@ -100,6 +119,8 @@ func TestCreateSendsPythonCompatiblePayload(t *testing.T) { Timeout: 300 * time.Second, RequestTimeout: time.Second, SandboxDomain: "cube.app", + ProxyNodeIP: dataHost, + ProxyPortHTTP: dataPort, }) sb, err := client.Create(context.Background(), CreateOptions{ @@ -136,6 +157,49 @@ func TestCreateSendsPythonCompatiblePayload(t *testing.T) { if _, ok := got["mcp"].(map[string]any); !ok { t.Fatalf("extra field not preserved: %#v", got["mcp"]) } + + if initPath != "/init" { + t.Fatalf("init path=%q", initPath) + } + if initHost != fmt.Sprintf("%d-%s.cube.app", JupyterPort, testSandboxID) { + t.Fatalf("init Host=%q", initHost) + } + assertMapString(t, initBody["envVars"], "FOO", "bar") + if _, ok := initBody["timestamp"].(string); !ok { + t.Fatalf("init timestamp missing: %#v", initBody["timestamp"]) + } +} + +func TestCreateWithoutEnvVarsSkipsInit(t *testing.T) { + dataCalled := false + dataServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + dataCalled = true + w.WriteHeader(http.StatusOK) + })) + defer dataServer.Close() + + dataHost, dataPort := serverHostPort(t, dataServer.URL) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + fmt.Fprint(w, sandboxJSON(testSandboxID, "tpl-no-init")) + })) + defer server.Close() + + client := NewClient(Config{ + APIURL: server.URL, + TemplateID: "tpl-no-init", + Timeout: 300 * time.Second, + RequestTimeout: time.Second, + ProxyNodeIP: dataHost, + ProxyPortHTTP: dataPort, + }) + if _, err := client.Create(context.Background(), CreateOptions{}); err != nil { + t.Fatalf("Create returned error: %v", err) + } + if dataCalled { + t.Fatal("data plane was called without env_vars") + } } func TestCreateOmitsOptionalFieldsAndRequiresTemplate(t *testing.T) { diff --git a/sdk/python/cubesandbox/sandbox.py b/sdk/python/cubesandbox/sandbox.py index 391dd698..237d090e 100644 --- a/sdk/python/cubesandbox/sandbox.py +++ b/sdk/python/cubesandbox/sandbox.py @@ -3,12 +3,15 @@ from __future__ import annotations +import random +import time +from datetime import datetime, timezone from typing import Any, Callable, Dict import httpx import requests -from ._commands import CommandResult, Commands +from ._commands import CommandResult, Commands, ENVD_PORT from ._config import Config from ._exceptions import ApiError, AuthenticationError, CubeSandboxError, SandboxNotFoundError, TemplateNotFoundError from ._filesystem import Filesystem @@ -18,6 +21,10 @@ from ._transport import build_client JUPYTER_PORT = 49999 +ENVD_INIT_MAX_ATTEMPTS = 5 +ENVD_INIT_RETRY_BASE_SECS = 0.8 +ENVD_INIT_RETRY_JITTER_SECS = 0.4 +ENVD_INIT_REQ_TIMEOUT_SECS = 10.0 def _check_response(resp: requests.Response) -> None: @@ -156,7 +163,10 @@ def create( resp = s.post(f"{cfg.api_url}/sandboxes", json=payload, headers={"Content-Type": "application/json"}) _check_response(resp) - return cls(resp.json(), config=cfg) + sandbox = cls(resp.json(), config=cfg) + if env_vars: + sandbox._init_env_vars(env_vars) + return sandbox @classmethod def connect(cls, sandbox_id: str, *, config: Config | None = None) -> "Sandbox": @@ -667,3 +677,64 @@ def _build_session(self) -> requests.Session: def _build_data_client(self) -> httpx.Client: """Build an HTTP client for CubeProxy-routed sandbox data-plane APIs.""" return build_client(self._config) + + def _init_env_vars(self, env_vars: Dict[str, str]) -> None: + """Make create-time env_vars visible to later commands.run / run_code. + + The control plane only records env_vars as sandbox metadata; it never + loads them into the guest runtime. So once the sandbox is up we push + them into the guest via envd's native POST /init, reusing the exact + CubeProxy data-plane channel commands.run already uses. envd stores them + as global defaults and merges them into every later process execution, + giving the precedence ``template env < create env < per-command env``. + + Routing through the configured data-plane client means no extra + deployment configuration is required: whatever address commands.run + reaches envd on, /init reaches it on too. + """ + if not env_vars: + return + if self._client is None: + self._client = self._build_data_client() + + headers = {} + access_token = self._data.get("envdAccessToken") + if access_token: + headers["X-Access-Token"] = access_token + + url = f"http://{self.get_host(ENVD_PORT)}/init" + body = { + "envVars": env_vars, + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + # The proxy route to a freshly created sandbox may settle a moment after + # create returns, so retry briefly before surfacing a hard failure. + last_error: Exception | None = None + for attempt in range(ENVD_INIT_MAX_ATTEMPTS): + if attempt: + delay = ENVD_INIT_RETRY_BASE_SECS + random.uniform( + 0, ENVD_INIT_RETRY_JITTER_SECS + ) + time.sleep(delay) + try: + resp = self._client.post( + url, + json=body, + headers=headers, + timeout=ENVD_INIT_REQ_TIMEOUT_SECS, + ) + try: + if resp.status_code < 400: + return + last_error = RuntimeError( + f"envd /init returned HTTP {resp.status_code}" + ) + finally: + resp.close() + except BaseException as exc: + if isinstance(exc, (KeyboardInterrupt, SystemExit)): + raise + last_error = exc + + raise CubeSandboxError(f"failed to inject create env_vars into sandbox: {last_error}") diff --git a/sdk/python/tests/test_sandbox.py b/sdk/python/tests/test_sandbox.py index 8fefb318..e1a3601b 100644 --- a/sdk/python/tests/test_sandbox.py +++ b/sdk/python/tests/test_sandbox.py @@ -105,10 +105,31 @@ def test_create_sends_template_and_timeout(self): assert body["timeout"] == 600 def test_create_sends_env_vars(self): - with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)) as m: + captured: dict = {} + + def handler(request: httpx.Request) -> httpx.Response: + captured["url"] = str(request.url) + captured["body"] = json.loads(request.content) + return httpx.Response(200) + + client = httpx.Client(transport=httpx.MockTransport(handler)) + with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)) as m, \ + patch.object(Sandbox, "_build_data_client", return_value=client): Sandbox.create(env_vars={"FOO": "bar"}, config=make_config()) + body = m.call_args.kwargs["json"] assert body["envVars"] == {"FOO": "bar"} + # SDK transparently pushes create-time env_vars into the guest via + # envd's native /init so later commands.run can read them. + assert captured["url"].endswith("/init") + assert captured["body"]["envVars"] == {"FOO": "bar"} + + def test_create_without_env_vars_skips_init(self): + build = MagicMock() + with patch("requests.Session.post", return_value=mock_response(SANDBOX_DATA, status=201)), \ + patch.object(Sandbox, "_build_data_client", new=build): + Sandbox.create(config=make_config()) + build.assert_not_called() def test_create_sends_metadata(self): meta = {"network-policy": "deny-all"}