@@ -274,6 +274,13 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error {
274274 cfg := task .config
275275 var err error
276276
277+ runnerDir , err := d .dockerParams .MakeRunnerDir (task .containerName )
278+ if err != nil {
279+ return tracerr .Wrap (err )
280+ }
281+ task .runnerDir = runnerDir
282+ log .Debug (ctx , "runner dir" , "task" , task .ID , "path" , runnerDir )
283+
277284 if cfg .GPU != 0 {
278285 gpuIDs , err := d .gpuLock .Acquire (ctx , cfg .GPU )
279286 if err != nil {
@@ -335,7 +342,10 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error {
335342 if err := d .tasks .Update (task ); err != nil {
336343 return tracerr .Errorf ("%w: failed to update task %s: %w" , ErrInternal , task .ID , err )
337344 }
338- if err = pullImage (pullCtx , d .client , cfg ); err != nil {
345+ // Although it's called "runner dir", we also use it for shim task-related data.
346+ // Maybe we should rename it to "task dir" (including the `/root/.dstack/runners` dir on the host).
347+ pullLogPath := filepath .Join (runnerDir , "pull.log" )
348+ if err = pullImage (pullCtx , d .client , cfg , pullLogPath ); err != nil {
339349 errMessage := fmt .Sprintf ("pullImage error: %s" , err .Error ())
340350 log .Error (ctx , errMessage )
341351 task .SetStatusTerminated (string (types .TerminationReasonCreatingContainerError ), errMessage )
@@ -655,7 +665,7 @@ func mountDisk(ctx context.Context, deviceName, mountPoint string, fsRootPerms o
655665 return nil
656666}
657667
658- func pullImage (ctx context.Context , client docker.APIClient , taskConfig TaskConfig ) error {
668+ func pullImage (ctx context.Context , client docker.APIClient , taskConfig TaskConfig , logPath string ) error {
659669 if ! strings .Contains (taskConfig .ImageName , ":" ) {
660670 taskConfig .ImageName += ":latest"
661671 }
@@ -685,51 +695,70 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf
685695 if err != nil {
686696 return tracerr .Wrap (err )
687697 }
688- defer func () { _ = reader .Close () }()
698+ defer reader .Close ()
699+
700+ logFile , err := os .OpenFile (logPath , os .O_CREATE | os .O_TRUNC | os .O_WRONLY , 0o644 )
701+ if err != nil {
702+ return tracerr .Wrap (err )
703+ }
704+ defer logFile .Close ()
705+
706+ teeReader := io .TeeReader (reader , logFile )
689707
690708 current := make (map [string ]uint )
691709 total := make (map [string ]uint )
692710
693- type ProgressDetail struct {
694- Current uint `json:"current"`
695- Total uint `json:"total"`
696- }
697- type Progress struct {
698- Id string `json:"id"`
699- Status string `json:"status"`
700- ProgressDetail ProgressDetail `json:"progressDetail"` //nolint:tagliatelle
701- Error string `json:"error"`
711+ // dockerd reports pulling progress as a stream of JSON Lines. The format of records is not documented in the API documentation,
712+ // although it's occasionally mentioned, e.g., https://docs.docker.com/reference/api/engine/version-history/#v148-api-changes
713+
714+ // https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/pkg/jsonmessage/jsonmessage.go#L144
715+ // All fields are optional
716+ type PullMessage struct {
717+ Id string `json:"id"` // layer id
718+ Status string `json:"status"`
719+ ProgressDetail struct {
720+ Current uint `json:"current"` // bytes
721+ Total uint `json:"total"` // bytes
722+ } `json:"progressDetail"`
723+ ErrorDetail struct {
724+ Message string `json:"message"`
725+ } `json:"errorDetail"`
702726 }
703727
704- var status bool
728+ var pullCompleted bool
705729 pullErrors := make ([]string , 0 )
706730
707- scanner := bufio .NewScanner (reader )
731+ scanner := bufio .NewScanner (teeReader )
708732 for scanner .Scan () {
709733 line := scanner .Bytes ()
710- var progressRow Progress
711- if err := json .Unmarshal (line , & progressRow ); err != nil {
734+ var pullMessage PullMessage
735+ if err := json .Unmarshal (line , & pullMessage ); err != nil {
712736 continue
713737 }
714- if progressRow .Status == "Downloading" {
715- current [progressRow .Id ] = progressRow .ProgressDetail .Current
716- total [progressRow .Id ] = progressRow .ProgressDetail .Total
738+ if pullMessage .Status == "Downloading" {
739+ current [pullMessage .Id ] = pullMessage .ProgressDetail .Current
740+ total [pullMessage .Id ] = pullMessage .ProgressDetail .Total
717741 }
718- if progressRow .Status == "Download complete" {
719- current [progressRow .Id ] = total [progressRow .Id ]
742+ if pullMessage .Status == "Download complete" {
743+ current [pullMessage .Id ] = total [pullMessage .Id ]
720744 }
721- if progressRow . Error != "" {
722- log .Error (ctx , "error pulling image" , "name" , taskConfig .ImageName , "err" , progressRow . Error )
723- pullErrors = append (pullErrors , progressRow . Error )
745+ if pullMessage . ErrorDetail . Message != "" {
746+ log .Error (ctx , "error pulling image" , "name" , taskConfig .ImageName , "err" , pullMessage . ErrorDetail . Message )
747+ pullErrors = append (pullErrors , pullMessage . ErrorDetail . Message )
724748 }
725- if strings .HasPrefix (progressRow .Status , "Status:" ) {
726- status = true
727- log .Debug (ctx , progressRow .Status )
749+ // If the pull is successful, the last two entries must be:
750+ // "Digest: sha256:<hash>"
751+ // "Status: <message>"
752+ // where <message> is either "Downloaded newer image for <tag>" or "Image is up to date for <tag>".
753+ // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L134-L152
754+ // See: https://github.com/moby/moby/blob/e77ff99ede5ee5952b3a9227863552ae6e5b6fb1/daemon/containerd/image_pull.go#L257-L263
755+ if strings .HasPrefix (pullMessage .Status , "Status:" ) {
756+ pullCompleted = true
757+ log .Debug (ctx , pullMessage .Status )
728758 }
729759 }
730760
731761 duration := time .Since (startTime )
732-
733762 var currentBytes uint
734763 var totalBytes uint
735764 for _ , v := range current {
@@ -738,9 +767,13 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf
738767 for _ , v := range total {
739768 totalBytes += v
740769 }
741-
742770 speed := bytesize .New (float64 (currentBytes ) / duration .Seconds ())
743- if status && currentBytes == totalBytes {
771+
772+ if err := ctx .Err (); err != nil {
773+ return tracerr .Errorf ("image pull interrupted: downloaded %d bytes out of %d (%s/s): %w" , currentBytes , totalBytes , speed , err )
774+ }
775+
776+ if pullCompleted {
744777 log .Debug (ctx , "image successfully pulled" , "bytes" , currentBytes , "bps" , speed )
745778 } else {
746779 return tracerr .Errorf (
@@ -749,21 +782,11 @@ func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConf
749782 )
750783 }
751784
752- err = ctx .Err ()
753- if err != nil {
754- return tracerr .Errorf ("imagepull interrupted: downloaded %d bytes out of %d (%s/s): %w" , currentBytes , totalBytes , speed , err )
755- }
756785 return nil
757786}
758787
759788func (d * DockerRunner ) createContainer (ctx context.Context , task * Task ) error {
760- runnerDir , err := d .dockerParams .MakeRunnerDir (task .containerName )
761- if err != nil {
762- return tracerr .Wrap (err )
763- }
764- task .runnerDir = runnerDir
765-
766- mounts , err := d .dockerParams .DockerMounts (runnerDir )
789+ mounts , err := d .dockerParams .DockerMounts (task .runnerDir )
767790 if err != nil {
768791 return tracerr .Wrap (err )
769792 }
0 commit comments