From 864108042b6a0f352b59dd36afc6bd5c9bb4233e Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Mon, 14 Jul 2025 17:42:30 +0000 Subject: [PATCH] [shim] Don't check image downloaded size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sometimes dockerd emits less "Download complete" messages than expected, but the image is pulled successfully. The reason is unclear, but, anyway, this check is redundant since we also rely on the status message, which is emitted only in the case of succesfull pull. In addition, this patch adds/changes the following: * Write dockerd pull stream (JSON Lines) to {runnerDir}/pull.log — useful for debugging (in conjunction with DSTACK_SERVER_KEEP_SHIM_TASKS=1) * Use `errorDetail.message` instead of deprecated `error` * Move `ctx.Err()` check upper, otherwise it's shadowed by pull errors Fixes: https://github.com/dstackai/dstack/issues/2503 --- .pre-commit-config.yaml | 2 +- runner/go.mod | 3 +- runner/go.sum | 7 +- runner/internal/shim/docker.go | 105 +++++++++++++++++----------- runner/internal/shim/docker_test.go | 15 ++-- 5 files changed, 81 insertions(+), 51 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d9e8408e2c..4984bb1213 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: rev: v1.62.0 # Should match .github/workflows/build.yml hooks: - id: golangci-lint-full - language_version: 1.23.0 # Should match runner/go.mod + language_version: 1.23.8 # Should match runner/go.mod entry: bash -c 'cd runner && golangci-lint run' stages: [manual] - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/runner/go.mod b/runner/go.mod index c619383bd0..850ea82530 100644 --- a/runner/go.mod +++ b/runner/go.mod @@ -79,10 +79,11 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.20.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.5.0 // indirect + gotest.tools/v3 v3.5.1 // indirect ) diff --git a/runner/go.sum b/runner/go.sum index 801974184f..1222fcac83 100644 --- a/runner/go.sum +++ b/runner/go.sum @@ -285,8 +285,9 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.0.0-20170424234030-8be79e1e0910 h1:bCMaBn7ph495H+x72gEvgcv+mDRd9dElbzo/mVCMxX4= golang.org/x/time v0.0.0-20170424234030-8be79e1e0910/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -322,5 +323,5 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY= -gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index 19462c9d72..1834188ae1 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -274,6 +274,13 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { cfg := task.config var err error + runnerDir, err := d.dockerParams.MakeRunnerDir(task.containerName) + if err != nil { + return tracerr.Wrap(err) + } + task.runnerDir = runnerDir + log.Debug(ctx, "runner dir", "task", task.ID, "path", runnerDir) + if cfg.GPU != 0 { gpuIDs, err := d.gpuLock.Acquire(ctx, cfg.GPU) if err != nil { @@ -335,7 +342,10 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err := d.tasks.Update(task); err != nil { return tracerr.Errorf("%w: failed to update task %s: %w", ErrInternal, task.ID, err) } - if err = pullImage(pullCtx, d.client, cfg); err != nil { + // Although it's called "runner dir", we also use it for shim task-related data. + // Maybe we should rename it to "task dir" (including the `/root/.dstack/runners` dir on the host). + pullLogPath := filepath.Join(runnerDir, "pull.log") + if err = pullImage(pullCtx, d.client, cfg, pullLogPath); err != nil { errMessage := fmt.Sprintf("pullImage error: %s", err.Error()) log.Error(ctx, errMessage) task.SetStatusTerminated(string(types.TerminationReasonCreatingContainerError), errMessage) @@ -655,7 +665,7 @@ func mountDisk(ctx context.Context, deviceName, mountPoint string, fsRootPerms o return nil } -func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConfig) error { +func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConfig, logPath string) error { if !strings.Contains(taskConfig.ImageName, ":") { taskConfig.ImageName += ":latest" } @@ -685,51 +695,70 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf if err != nil { return tracerr.Wrap(err) } - defer func() { _ = reader.Close() }() + defer reader.Close() + + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return tracerr.Wrap(err) + } + defer logFile.Close() + + teeReader := io.TeeReader(reader, logFile) current := make(map[string]uint) total := make(map[string]uint) - type ProgressDetail struct { - Current uint `json:"current"` - Total uint `json:"total"` - } - type Progress struct { - Id string `json:"id"` - Status string `json:"status"` - ProgressDetail ProgressDetail `json:"progressDetail"` //nolint:tagliatelle - Error string `json:"error"` + // dockerd reports pulling progress as a stream of JSON Lines. The format of records is not documented in the API documentation, + // although it's occasionally mentioned, e.g., https://docs.docker.com/reference/api/engine/version-history/#v148-api-changes + + // https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/pkg/jsonmessage/jsonmessage.go#L144 + // All fields are optional + type PullMessage struct { + Id string `json:"id"` // layer id + Status string `json:"status"` + ProgressDetail struct { + Current uint `json:"current"` // bytes + Total uint `json:"total"` // bytes + } `json:"progressDetail"` + ErrorDetail struct { + Message string `json:"message"` + } `json:"errorDetail"` } - var status bool + var pullCompleted bool pullErrors := make([]string, 0) - scanner := bufio.NewScanner(reader) + scanner := bufio.NewScanner(teeReader) for scanner.Scan() { line := scanner.Bytes() - var progressRow Progress - if err := json.Unmarshal(line, &progressRow); err != nil { + var pullMessage PullMessage + if err := json.Unmarshal(line, &pullMessage); err != nil { continue } - if progressRow.Status == "Downloading" { - current[progressRow.Id] = progressRow.ProgressDetail.Current - total[progressRow.Id] = progressRow.ProgressDetail.Total + if pullMessage.Status == "Downloading" { + current[pullMessage.Id] = pullMessage.ProgressDetail.Current + total[pullMessage.Id] = pullMessage.ProgressDetail.Total } - if progressRow.Status == "Download complete" { - current[progressRow.Id] = total[progressRow.Id] + if pullMessage.Status == "Download complete" { + current[pullMessage.Id] = total[pullMessage.Id] } - if progressRow.Error != "" { - log.Error(ctx, "error pulling image", "name", taskConfig.ImageName, "err", progressRow.Error) - pullErrors = append(pullErrors, progressRow.Error) + if pullMessage.ErrorDetail.Message != "" { + log.Error(ctx, "error pulling image", "name", taskConfig.ImageName, "err", pullMessage.ErrorDetail.Message) + pullErrors = append(pullErrors, pullMessage.ErrorDetail.Message) } - if strings.HasPrefix(progressRow.Status, "Status:") { - status = true - log.Debug(ctx, progressRow.Status) + // If the pull is successful, the last two entries must be: + // "Digest: sha256:" + // "Status: " + // where is either "Downloaded newer image for " or "Image is up to date for ". + // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L134-L152 + // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L257-L263 + if strings.HasPrefix(pullMessage.Status, "Status:") { + pullCompleted = true + log.Debug(ctx, pullMessage.Status) } } duration := time.Since(startTime) - var currentBytes uint var totalBytes uint for _, v := range current { @@ -738,9 +767,13 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf for _, v := range total { totalBytes += v } - speed := bytesize.New(float64(currentBytes) / duration.Seconds()) - if status && currentBytes == totalBytes { + + if err := ctx.Err(); err != nil { + return tracerr.Errorf("image pull interrupted: downloaded %d bytes out of %d (%s/s): %w", currentBytes, totalBytes, speed, err) + } + + if pullCompleted { log.Debug(ctx, "image successfully pulled", "bytes", currentBytes, "bps", speed) } else { return tracerr.Errorf( @@ -749,21 +782,11 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf ) } - err = ctx.Err() - if err != nil { - return tracerr.Errorf("imagepull interrupted: downloaded %d bytes out of %d (%s/s): %w", currentBytes, totalBytes, speed, err) - } return nil } func (d *DockerRunner) createContainer(ctx context.Context, task *Task) error { - runnerDir, err := d.dockerParams.MakeRunnerDir(task.containerName) - if err != nil { - return tracerr.Wrap(err) - } - task.runnerDir = runnerDir - - mounts, err := d.dockerParams.DockerMounts(runnerDir) + mounts, err := d.dockerParams.DockerMounts(task.runnerDir) if err != nil { return tracerr.Wrap(err) } diff --git a/runner/internal/shim/docker_test.go b/runner/internal/shim/docker_test.go index 35c8cbab6d..29a5e1afdb 100644 --- a/runner/internal/shim/docker_test.go +++ b/runner/internal/shim/docker_test.go @@ -26,8 +26,9 @@ func TestDocker_SSHServer(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"echo 1"}, - sshPort: nextPort(), + commands: []string{"echo 1"}, + sshPort: nextPort(), + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -58,6 +59,7 @@ func TestDocker_SSHServerConnect(t *testing.T) { commands: []string{"sleep 5"}, sshPort: nextPort(), publicSSHKey: string(publicBytes), + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -103,7 +105,8 @@ func TestDocker_ShmNoexecByDefault(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"mount | grep '/dev/shm .*size=65536k' | grep noexec"}, + commands: []string{"mount | grep '/dev/shm .*size=65536k' | grep noexec"}, + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -125,7 +128,8 @@ func TestDocker_ShmExecIfSizeSpecified(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"mount | grep '/dev/shm .*size=1024k' | grep -v noexec"}, + commands: []string{"mount | grep '/dev/shm .*size=1024k' | grep -v noexec"}, + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -148,6 +152,7 @@ type dockerParametersMock struct { commands []string sshPort int publicSSHKey string + runnerDir string } func (c *dockerParametersMock) DockerPrivileged() bool { @@ -184,7 +189,7 @@ func (c *dockerParametersMock) DockerMounts(string) ([]mount.Mount, error) { } func (c *dockerParametersMock) MakeRunnerDir(string) (string, error) { - return "", nil + return c.runnerDir, nil } /* Utilities */