Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion docs/docs/reference/dstack.yml/dev-environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,21 @@ The `dev-environment` configuration type allows running [dev environments](../..
The short syntax for volumes is a colon-separated string in the form of `source:destination`

* `volume-name:/container/path` for network volumes
* `/instance/path:/container/path` for instance volumes
* `/instance/path:/container/path` for instance volumes

### `files[n]` { #_files data-toc-label="files" }

#SCHEMA# dstack._internal.core.models.files.FilePathMapping
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for files is a colon-separated string in the form of `local_path[:path]` where
`path` is optional and can be omitted if it's equal to `local_path`.

* `~/.bashrc`, same as `~/.bashrc:~/.bashrc`
* `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg`
* `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1`
17 changes: 17 additions & 0 deletions docs/docs/reference/dstack.yml/service.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,20 @@ The `service` configuration type allows running [services](../../concepts/servic

* `volume-name:/container/path` for network volumes
* `/instance/path:/container/path` for instance volumes

### `files[n]` { #_files data-toc-label="files" }

#SCHEMA# dstack._internal.core.models.files.FilePathMapping
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for files is a colon-separated string in the form of `local_path[:path]` where
`path` is optional and can be omitted if it's equal to `local_path`.

* `~/.bashrc`, same as `~/.bashrc:~/.bashrc`
* `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg`
* `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1`
17 changes: 17 additions & 0 deletions docs/docs/reference/dstack.yml/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,20 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md).

* `volume-name:/container/path` for network volumes
* `/instance/path:/container/path` for instance volumes

### `files[n]` { #_files data-toc-label="files" }

#SCHEMA# dstack._internal.core.models.files.FilePathMapping
overrides:
show_root_heading: false
type:
required: true

??? info "Short syntax"

The short syntax for files is a colon-separated string in the form of `local_path[:path]` where
`path` is optional and can be omitted if it's equal to `local_path`.

* `~/.bashrc`, same as `~/.bashrc:~/.bashrc`
* `/opt/myorg`, same as `/opt/myorg/` and `/opt/myorg:/opt/myorg`
* `libs/patched_libibverbs.so.1:/lib/x86_64-linux-gnu/libibverbs.so.1`
11 changes: 4 additions & 7 deletions runner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ go 1.23
require (
github.com/alexellis/go-execute/v2 v2.2.1
github.com/bluekeyes/go-gitdiff v0.7.2
github.com/codeclysm/extract/v4 v4.0.0
github.com/creack/pty v1.1.24
github.com/docker/docker v26.0.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.5.0
github.com/go-git/go-git/v5 v5.12.0
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
github.com/gorilla/websocket v1.5.1
github.com/inhies/go-bytesize v0.0.0-20220417184213-4913239db9cf
github.com/prometheus/procfs v0.15.1
github.com/shirou/gopsutil/v4 v4.24.11
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.10.0
Expand Down Expand Up @@ -78,12 +81,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gotest.tools/v3 v3.5.0 // indirect
)

require (
github.com/codeclysm/extract/v3 v3.1.1
github.com/gorilla/websocket v1.5.1
github.com/prometheus/procfs v0.15.1
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.0 // indirect
)
8 changes: 4 additions & 4 deletions runner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ github.com/alexellis/go-execute/v2 v2.2.1 h1:4Ye3jiCKQarstODOEmqDSRCqxMHLkC92Bhs
github.com/alexellis/go-execute/v2 v2.2.1/go.mod h1:FMdRnUTiFAmYXcv23txrp3VYZfLo24nMpiIneWgKHTQ=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/arduino/go-paths-helper v1.2.0 h1:qDW93PR5IZUN/jzO4rCtexiwF8P4OIcOmcSgAYLZfY4=
github.com/arduino/go-paths-helper v1.2.0/go.mod h1:HpxtKph+g238EJHq4geEPv9p+gl3v5YYu35Yb+w31Ck=
github.com/arduino/go-paths-helper v1.12.1 h1:WkxiVUxBjKWlLMiMuYy8DcmVrkxdP7aKxQOAq7r2lVM=
github.com/arduino/go-paths-helper v1.12.1/go.mod h1:jcpW4wr0u69GlXhTYydsdsqAjLaYK5n7oWHfKqOG6LM=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/bluekeyes/go-gitdiff v0.7.2 h1:42jrcVZdjjxXtVsFNYTo/I6T1ZvIiQL+iDDLiH904hw=
Expand All @@ -26,8 +26,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA=
github.com/cloudflare/circl v1.3.7 h1:qlCDlTPz2n9fu58M0Nh1J/JzcFpfgkFHHX3O35r5vcU=
github.com/cloudflare/circl v1.3.7/go.mod h1:sRTcRWXGLrKw6yIGJ+l7amYJFfAXbZG0kBSc8r4zxgA=
github.com/codeclysm/extract/v3 v3.1.1 h1:iHZtdEAwSTqPrd+1n4jfhr1qBhUWtHlMTjT90+fJVXg=
github.com/codeclysm/extract/v3 v3.1.1/go.mod h1:ZJi80UG2JtfHqJI+lgJSCACttZi++dHxfWuPaMhlOfQ=
github.com/codeclysm/extract/v4 v4.0.0 h1:H87LFsUNaJTu2e/8p/oiuiUsOK/TaPQ5wxsjPnwPEIY=
github.com/codeclysm/extract/v4 v4.0.0/go.mod h1:SFju1lj6as7FvUgalpSct7torJE0zttbJUWtryPRG6s=
github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I=
github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo=
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
Expand Down
10 changes: 5 additions & 5 deletions runner/internal/api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ func DecodeJSONBody(w http.ResponseWriter, r *http.Request, dst interface{}, all
func JSONResponseHandler(handler func(http.ResponseWriter, *http.Request) (interface{}, error)) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
status := 200
msg := ""
errMsg := ""
var apiErr *Error

body, err := handler(w, r)
if err != nil {
if errors.As(err, &apiErr) {
status = apiErr.Status
msg = apiErr.Error()
log.Warning(r.Context(), "API error", "err", apiErr.Err)
errMsg = apiErr.Error()
log.Warning(r.Context(), "API error", "err", errMsg, "status", status)
} else {
status = http.StatusInternalServerError
log.Error(r.Context(), "Unexpected API error", "err", err)
log.Error(r.Context(), "Unexpected API error", "err", err, "status", status)
}
}

Expand All @@ -125,7 +125,7 @@ func JSONResponseHandler(handler func(http.ResponseWriter, *http.Request) (inter
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(body)
} else {
http.Error(w, msg, status)
http.Error(w, errMsg, status)
}

log.Debug(r.Context(), "", "method", r.Method, "endpoint", r.URL.Path, "status", status)
Expand Down
2 changes: 2 additions & 0 deletions runner/internal/executor/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"context"
"io"

"github.com/dstackai/dstack/runner/internal/schemas"
"github.com/dstackai/dstack/runner/internal/types"
Expand All @@ -22,6 +23,7 @@ type Executor interface {
termination_message string,
)
SetRunnerState(state string)
AddFileArchive(id string, src io.Reader) error
Lock()
RLock()
RUnlock()
Expand Down
38 changes: 30 additions & 8 deletions runner/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type RunExecutor struct {
tempDir string
homeDir string
workingDir string
archiveDir string
sshPort int
uid uint32
currentUid uint32

run schemas.Run
jobSpec schemas.JobSpec
Expand Down Expand Up @@ -77,8 +78,9 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
tempDir: tempDir,
homeDir: homeDir,
workingDir: workingDir,
archiveDir: filepath.Join(tempDir, "file_archives"),
sshPort: sshPort,
uid: uid,
currentUid: uid,

mu: mu,
state: WaitSubmit,
Expand Down Expand Up @@ -131,6 +133,28 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())

if ex.jobSpec.User != nil {
if err := fillUser(ex.jobSpec.User); err != nil {
ex.SetJobStateWithTerminationReason(
ctx,
types.JobStateFailed,
types.TerminationReasonExecutorError,
fmt.Sprintf("Failed to fill in the job user fields (%s)", err),
)
return gerrors.Wrap(err)
}
}

if err := ex.setupFiles(ctx); err != nil {
ex.SetJobStateWithTerminationReason(
ctx,
types.JobStateFailed,
types.TerminationReasonExecutorError,
fmt.Sprintf("Failed to set up files (%s)", err),
)
return gerrors.Wrap(err)
}

if err := ex.setupRepo(ctx); err != nil {
ex.SetJobStateWithTerminationReason(
ctx,
Expand All @@ -140,6 +164,7 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
)
return gerrors.Wrap(err)
}

cleanupCredentials, err := ex.setupCredentials(ctx)
if err != nil {
ex.SetJobState(ctx, types.JobStateFailed)
Expand Down Expand Up @@ -300,16 +325,13 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error

user := ex.jobSpec.User
if user != nil {
if err := fillUser(user); err != nil {
return gerrors.Wrap(err)
}
log.Trace(
ctx, "Using credentials",
"uid", *user.Uid, "gid", *user.Gid, "groups", user.GroupIds,
"username", user.GetUsername(), "groupname", user.GetGroupname(),
"home", user.HomeDir,
)
log.Trace(ctx, "Current user", "uid", ex.uid)
log.Trace(ctx, "Current user", "uid", ex.currentUid)

// 1. Ideally, We should check uid, gid, and supplementary groups mismatches,
// but, for the sake of simplicity, we only check uid. Unprivileged runner
Expand All @@ -318,8 +340,8 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
// 2. Strictly speaking, we need CAP_SETUID and CAP_GUID (for Cmd.Start()->
// Cmd.SysProcAttr.Credential) and CAP_CHOWN (for startCommand()->os.Chown()),
// but for the sake of simplicity we instead check if we are root or not
if *user.Uid != ex.uid && ex.uid != 0 {
return gerrors.Newf("cannot start job as %d, current uid is %d", *user.Uid, ex.uid)
if *user.Uid != ex.currentUid && ex.currentUid != 0 {
return gerrors.Newf("cannot start job as %d, current uid is %d", *user.Uid, ex.currentUid)
}

if cmd.SysProcAttr == nil {
Expand Down
132 changes: 132 additions & 0 deletions runner/internal/executor/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package executor

import (
"context"
"errors"
"fmt"
"io"
"os"
"path"
"regexp"
"slices"
"strings"

"github.com/codeclysm/extract/v4"
"github.com/dstackai/dstack/runner/internal/gerrors"
"github.com/dstackai/dstack/runner/internal/log"
)

var renameRegex = regexp.MustCompile(`^([^/]*)(/|$)`)

func (ex *RunExecutor) AddFileArchive(id string, src io.Reader) error {
if err := os.MkdirAll(ex.archiveDir, 0o755); err != nil {
return gerrors.Wrap(err)
}
archivePath := path.Join(ex.archiveDir, id)
archive, err := os.Create(archivePath)
if err != nil {
return gerrors.Wrap(err)
}
defer func() { _ = archive.Close() }()
if _, err = io.Copy(archive, src); err != nil {
return gerrors.Wrap(err)
}
return nil
}

// setupFiles must be called from Run
func (ex *RunExecutor) setupFiles(ctx context.Context) error {
homeDir := ex.workingDir
uid := -1
gid := -1
if ex.jobSpec.User != nil {
if ex.jobSpec.User.HomeDir != "" {
homeDir = ex.jobSpec.User.HomeDir
}
if ex.jobSpec.User.Uid != nil {
uid = int(*ex.jobSpec.User.Uid)
}
if ex.jobSpec.User.Gid != nil {
gid = int(*ex.jobSpec.User.Gid)
}
}

for _, fa := range ex.run.RunSpec.FileArchives {
log.Trace(ctx, "Extracting file archive", "id", fa.Id, "path", fa.Path)

p := path.Clean(fa.Path)
// `~username[/path/to]` is not supported
if p == "~" {
p = homeDir
} else if rest, found := strings.CutPrefix(p, "~/"); found {
p = path.Join(homeDir, rest)
} else if !path.IsAbs(p) {
p = path.Join(ex.workingDir, p)
}
dir, root := path.Split(p)
if err := mkdirAll(ctx, dir, uid, gid); err != nil {
return gerrors.Wrap(err)
}

if err := os.RemoveAll(p); err != nil {
log.Warning(ctx, "Failed to remove", "path", p, "err", err)
}

archivePath := path.Join(ex.archiveDir, fa.Id)
archive, err := os.Open(archivePath)
if err != nil {
return gerrors.Wrap(err)
}
defer func() {
_ = archive.Close()
if err := os.Remove(archivePath); err != nil {
log.Warning(ctx, "Failed to remove archive", "path", archivePath, "err", err)
}
}()

var paths []string
repl := fmt.Sprintf("%s$2", root)
renameAndRemember := func(s string) string {
s = renameRegex.ReplaceAllString(s, repl)
paths = append(paths, s)
return s
}
if err := extract.Tar(ctx, archive, dir, renameAndRemember); err != nil {
return gerrors.Wrap(err)
}

if uid != -1 || gid != -1 {
for _, p := range paths {
if err := os.Chown(path.Join(dir, p), uid, gid); err != nil {
log.Warning(ctx, "Failed to chown", "path", p, "err", err)
}
}
}
}

return nil
}

func mkdirAll(ctx context.Context, p string, uid int, gid int) error {
var paths []string
for {
p = path.Dir(p)
if p == "/" {
break
}
paths = append(paths, p)
}
for _, p := range slices.Backward(paths) {
if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
if err := os.Mkdir(p, 0o755); err != nil {
return err
}
if err := os.Chown(p, uid, gid); err != nil {
log.Warning(ctx, "Failed to chown", "path", p, "err", err)
}
} else if err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion runner/internal/executor/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os/exec"
"path/filepath"

"github.com/codeclysm/extract/v3"
"github.com/codeclysm/extract/v4"
"github.com/dstackai/dstack/runner/internal/gerrors"
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/repo"
Expand Down
Loading
Loading