diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d9e8408e2c..4984bb1213 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: rev: v1.62.0 # Should match .github/workflows/build.yml hooks: - id: golangci-lint-full - language_version: 1.23.0 # Should match runner/go.mod + language_version: 1.23.8 # Should match runner/go.mod entry: bash -c 'cd runner && golangci-lint run' stages: [manual] - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/runner/go.mod b/runner/go.mod index c619383bd0..850ea82530 100644 --- a/runner/go.mod +++ b/runner/go.mod @@ -79,10 +79,11 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect + golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.20.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.5.0 // indirect + gotest.tools/v3 v3.5.1 // indirect ) diff --git a/runner/go.sum b/runner/go.sum index 801974184f..1222fcac83 100644 --- a/runner/go.sum +++ b/runner/go.sum @@ -285,8 +285,9 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.0.0-20170424234030-8be79e1e0910 h1:bCMaBn7ph495H+x72gEvgcv+mDRd9dElbzo/mVCMxX4= golang.org/x/time v0.0.0-20170424234030-8be79e1e0910/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -322,5 +323,5 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY= -gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index 19462c9d72..1834188ae1 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -274,6 +274,13 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { cfg := task.config var err error + runnerDir, err := d.dockerParams.MakeRunnerDir(task.containerName) + if err != nil { + return tracerr.Wrap(err) + } + task.runnerDir = runnerDir + log.Debug(ctx, "runner dir", "task", task.ID, "path", runnerDir) + if cfg.GPU != 0 { gpuIDs, err := d.gpuLock.Acquire(ctx, cfg.GPU) if err != nil { @@ -335,7 +342,10 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err := d.tasks.Update(task); err != nil { return tracerr.Errorf("%w: failed to update task %s: %w", ErrInternal, task.ID, err) } - if err = pullImage(pullCtx, d.client, cfg); err != nil { + // Although it's called "runner dir", we also use it for shim task-related data. + // Maybe we should rename it to "task dir" (including the `/root/.dstack/runners` dir on the host). + pullLogPath := filepath.Join(runnerDir, "pull.log") + if err = pullImage(pullCtx, d.client, cfg, pullLogPath); err != nil { errMessage := fmt.Sprintf("pullImage error: %s", err.Error()) log.Error(ctx, errMessage) task.SetStatusTerminated(string(types.TerminationReasonCreatingContainerError), errMessage) @@ -655,7 +665,7 @@ func mountDisk(ctx context.Context, deviceName, mountPoint string, fsRootPerms o return nil } -func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConfig) error { +func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConfig, logPath string) error { if !strings.Contains(taskConfig.ImageName, ":") { taskConfig.ImageName += ":latest" } @@ -685,51 +695,70 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf if err != nil { return tracerr.Wrap(err) } - defer func() { _ = reader.Close() }() + defer reader.Close() + + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644) + if err != nil { + return tracerr.Wrap(err) + } + defer logFile.Close() + + teeReader := io.TeeReader(reader, logFile) current := make(map[string]uint) total := make(map[string]uint) - type ProgressDetail struct { - Current uint `json:"current"` - Total uint `json:"total"` - } - type Progress struct { - Id string `json:"id"` - Status string `json:"status"` - ProgressDetail ProgressDetail `json:"progressDetail"` //nolint:tagliatelle - Error string `json:"error"` + // dockerd reports pulling progress as a stream of JSON Lines. The format of records is not documented in the API documentation, + // although it's occasionally mentioned, e.g., https://docs.docker.com/reference/api/engine/version-history/#v148-api-changes + + // https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/pkg/jsonmessage/jsonmessage.go#L144 + // All fields are optional + type PullMessage struct { + Id string `json:"id"` // layer id + Status string `json:"status"` + ProgressDetail struct { + Current uint `json:"current"` // bytes + Total uint `json:"total"` // bytes + } `json:"progressDetail"` + ErrorDetail struct { + Message string `json:"message"` + } `json:"errorDetail"` } - var status bool + var pullCompleted bool pullErrors := make([]string, 0) - scanner := bufio.NewScanner(reader) + scanner := bufio.NewScanner(teeReader) for scanner.Scan() { line := scanner.Bytes() - var progressRow Progress - if err := json.Unmarshal(line, &progressRow); err != nil { + var pullMessage PullMessage + if err := json.Unmarshal(line, &pullMessage); err != nil { continue } - if progressRow.Status == "Downloading" { - current[progressRow.Id] = progressRow.ProgressDetail.Current - total[progressRow.Id] = progressRow.ProgressDetail.Total + if pullMessage.Status == "Downloading" { + current[pullMessage.Id] = pullMessage.ProgressDetail.Current + total[pullMessage.Id] = pullMessage.ProgressDetail.Total } - if progressRow.Status == "Download complete" { - current[progressRow.Id] = total[progressRow.Id] + if pullMessage.Status == "Download complete" { + current[pullMessage.Id] = total[pullMessage.Id] } - if progressRow.Error != "" { - log.Error(ctx, "error pulling image", "name", taskConfig.ImageName, "err", progressRow.Error) - pullErrors = append(pullErrors, progressRow.Error) + if pullMessage.ErrorDetail.Message != "" { + log.Error(ctx, "error pulling image", "name", taskConfig.ImageName, "err", pullMessage.ErrorDetail.Message) + pullErrors = append(pullErrors, pullMessage.ErrorDetail.Message) } - if strings.HasPrefix(progressRow.Status, "Status:") { - status = true - log.Debug(ctx, progressRow.Status) + // If the pull is successful, the last two entries must be: + // "Digest: sha256:" + // "Status: " + // where is either "Downloaded newer image for " or "Image is up to date for ". + // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L134-L152 + // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L257-L263 + if strings.HasPrefix(pullMessage.Status, "Status:") { + pullCompleted = true + log.Debug(ctx, pullMessage.Status) } } duration := time.Since(startTime) - var currentBytes uint var totalBytes uint for _, v := range current { @@ -738,9 +767,13 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf for _, v := range total { totalBytes += v } - speed := bytesize.New(float64(currentBytes) / duration.Seconds()) - if status && currentBytes == totalBytes { + + if err := ctx.Err(); err != nil { + return tracerr.Errorf("image pull interrupted: downloaded %d bytes out of %d (%s/s): %w", currentBytes, totalBytes, speed, err) + } + + if pullCompleted { log.Debug(ctx, "image successfully pulled", "bytes", currentBytes, "bps", speed) } else { return tracerr.Errorf( @@ -749,21 +782,11 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf ) } - err = ctx.Err() - if err != nil { - return tracerr.Errorf("imagepull interrupted: downloaded %d bytes out of %d (%s/s): %w", currentBytes, totalBytes, speed, err) - } return nil } func (d *DockerRunner) createContainer(ctx context.Context, task *Task) error { - runnerDir, err := d.dockerParams.MakeRunnerDir(task.containerName) - if err != nil { - return tracerr.Wrap(err) - } - task.runnerDir = runnerDir - - mounts, err := d.dockerParams.DockerMounts(runnerDir) + mounts, err := d.dockerParams.DockerMounts(task.runnerDir) if err != nil { return tracerr.Wrap(err) } diff --git a/runner/internal/shim/docker_test.go b/runner/internal/shim/docker_test.go index 35c8cbab6d..29a5e1afdb 100644 --- a/runner/internal/shim/docker_test.go +++ b/runner/internal/shim/docker_test.go @@ -26,8 +26,9 @@ func TestDocker_SSHServer(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"echo 1"}, - sshPort: nextPort(), + commands: []string{"echo 1"}, + sshPort: nextPort(), + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -58,6 +59,7 @@ func TestDocker_SSHServerConnect(t *testing.T) { commands: []string{"sleep 5"}, sshPort: nextPort(), publicSSHKey: string(publicBytes), + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -103,7 +105,8 @@ func TestDocker_ShmNoexecByDefault(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"mount | grep '/dev/shm .*size=65536k' | grep noexec"}, + commands: []string{"mount | grep '/dev/shm .*size=65536k' | grep noexec"}, + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -125,7 +128,8 @@ func TestDocker_ShmExecIfSizeSpecified(t *testing.T) { t.Parallel() params := &dockerParametersMock{ - commands: []string{"mount | grep '/dev/shm .*size=1024k' | grep -v noexec"}, + commands: []string{"mount | grep '/dev/shm .*size=1024k' | grep -v noexec"}, + runnerDir: t.TempDir(), } timeout := 180 // seconds @@ -148,6 +152,7 @@ type dockerParametersMock struct { commands []string sshPort int publicSSHKey string + runnerDir string } func (c *dockerParametersMock) DockerPrivileged() bool { @@ -184,7 +189,7 @@ func (c *dockerParametersMock) DockerMounts(string) ([]mount.Mount, error) { } func (c *dockerParametersMock) MakeRunnerDir(string) (string, error) { - return "", nil + return c.runnerDir, nil } /* Utilities */