Skip to content

Commit 002ea51

Browse files
Add per-job hourly log quota enforced on runner (#3668)
1 parent 2222a63 commit 002ea51

File tree

9 files changed

+124
-7
lines changed

9 files changed

+124
-7
lines changed

runner/internal/common/types/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ const (
1010
TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user"
1111
TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server"
1212
TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded"
13+
TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded"
1314
)

runner/internal/runner/executor/executor.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,17 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
261261
default:
262262
}
263263

264+
if errors.Is(err, ErrLogQuotaExceeded) {
265+
log.Error(ctx, "Log quota exceeded", "quota", ex.jobLogs.quota)
266+
ex.SetJobStateWithTerminationReason(
267+
ctx,
268+
schemas.JobStateFailed,
269+
types.TerminationReasonLogQuotaExceeded,
270+
fmt.Sprintf("Job log output exceeded the hourly quota of %d bytes", ex.jobLogs.quota),
271+
)
272+
return fmt.Errorf("log quota exceeded: %w", err)
273+
}
274+
264275
// todo fail reason?
265276
log.Error(ctx, "Exec failed", "err", err)
266277
var exitError *exec.ExitError
@@ -283,6 +294,7 @@ func (ex *RunExecutor) SetJob(body schemas.SubmitBody) {
283294
ex.clusterInfo = body.ClusterInfo
284295
ex.secrets = body.Secrets
285296
ex.repoCredentials = body.RepoCredentials
297+
ex.jobLogs.SetQuota(body.LogQuotaHour)
286298
ex.state = WaitCode
287299
}
288300

@@ -586,18 +598,51 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
586598
defer func() { _ = cmd.Wait() }() // release resources if copy fails
587599

588600
stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
589-
defer func() { _ = stripper.Close() }()
590601
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
591-
_, err = io.Copy(logger, ptm)
592-
if err != nil && !isPtyError(err) {
593-
return fmt.Errorf("copy command output: %w", err)
602+
603+
if err := ex.copyOutputWithQuota(cmd, ptm, stripper, logger); err != nil {
604+
return err
594605
}
595606
if err = cmd.Wait(); err != nil {
596607
return fmt.Errorf("wait for command: %w", err)
597608
}
598609
return nil
599610
}
600611

612+
// copyOutputWithQuota streams process output through the log pipeline and
613+
// monitors for log quota exceeded. The quota signal is out-of-band (via channel)
614+
// because the ansistrip writer is async and swallows downstream write errors.
615+
func (ex *RunExecutor) copyOutputWithQuota(cmd *exec.Cmd, ptm io.Reader, stripper io.Closer, logger io.Writer) error {
616+
copyDone := make(chan error, 1)
617+
go func() {
618+
_, err := io.Copy(logger, ptm)
619+
copyDone <- err
620+
}()
621+
622+
// Wait for either io.Copy to finish or quota to be exceeded.
623+
var copyErr error
624+
select {
625+
case copyErr = <-copyDone:
626+
case <-ex.jobLogs.QuotaExceeded():
627+
_ = cmd.Process.Kill()
628+
<-copyDone
629+
}
630+
631+
// Flush the ansistrip buffer — may also trigger quota exceeded.
632+
_ = stripper.Close()
633+
634+
select {
635+
case <-ex.jobLogs.QuotaExceeded():
636+
return ErrLogQuotaExceeded
637+
default:
638+
}
639+
640+
if copyErr != nil && !isPtyError(copyErr) {
641+
return fmt.Errorf("copy command output: %w", copyErr)
642+
}
643+
return nil
644+
}
645+
601646
// setupGitCredentials must be called from Run after setJobUser
602647
func (ex *RunExecutor) setupGitCredentials(ctx context.Context) (func(), error) {
603648
if ex.repoCredentials == nil {

runner/internal/runner/executor/executor_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,27 @@ func TestExecutor_MaxDuration(t *testing.T) {
141141
assert.ErrorContains(t, err, "killed")
142142
}
143143

144+
func TestExecutor_LogQuota(t *testing.T) {
145+
if testing.Short() {
146+
t.Skip()
147+
}
148+
149+
ex := makeTestExecutor(t)
150+
ex.killDelay = 500 * time.Millisecond
151+
// Output >100 bytes to trigger the quota
152+
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "for i in $(seq 1 20); do echo 'This line is long enough to exceed the quota easily'; done")
153+
ex.jobLogs.SetQuota(100)
154+
makeCodeTar(t, ex)
155+
156+
err := ex.Run(t.Context())
157+
assert.ErrorContains(t, err, "log quota exceeded")
158+
159+
// Verify the termination state was set
160+
history := ex.GetHistory(0)
161+
lastState := history.JobStates[len(history.JobStates)-1]
162+
assert.Equal(t, schemas.JobStateFailed, lastState.State)
163+
}
164+
144165
func TestExecutor_RemoteRepo(t *testing.T) {
145166
if testing.Short() {
146167
t.Skip()

runner/internal/runner/executor/logs.go

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,65 @@
11
package executor
22

33
import (
4+
"errors"
5+
"math"
46
"sync"
7+
"time"
58

69
"github.com/dstackai/dstack/runner/internal/runner/schemas"
710
)
811

12+
var ErrLogQuotaExceeded = errors.New("log quota exceeded")
13+
914
type appendWriter struct {
1015
mu *sync.RWMutex // shares with executor
1116
history []schemas.LogEvent
1217
timestamp *MonotonicTimestamp // shares with executor
18+
19+
quota int // bytes per hour, 0 = unlimited
20+
bytesInHour int // bytes written in current hour bucket
21+
currentHour int // monotonic hour bucket index since timeStarted
22+
timeStarted time.Time // monotonic reference point for hour buckets
23+
quotaExceeded chan struct{} // closed when quota is exceeded (out-of-band signal)
24+
exceededOnce sync.Once
1325
}
1426

1527
func newAppendWriter(mu *sync.RWMutex, timestamp *MonotonicTimestamp) *appendWriter {
1628
return &appendWriter{
17-
mu: mu,
18-
history: make([]schemas.LogEvent, 0),
19-
timestamp: timestamp,
29+
mu: mu,
30+
history: make([]schemas.LogEvent, 0),
31+
timestamp: timestamp,
32+
quotaExceeded: make(chan struct{}),
2033
}
2134
}
2235

36+
func (w *appendWriter) SetQuota(quota int) {
37+
w.quota = quota
38+
w.timeStarted = time.Now()
39+
}
40+
41+
// QuotaExceeded returns a channel that is closed when the log quota is exceeded.
42+
func (w *appendWriter) QuotaExceeded() <-chan struct{} {
43+
return w.quotaExceeded
44+
}
45+
2346
func (w *appendWriter) Write(p []byte) (n int, err error) {
2447
w.mu.Lock()
2548
defer w.mu.Unlock()
2649

50+
if w.quota > 0 {
51+
hour := int(math.Floor(time.Since(w.timeStarted).Hours()))
52+
if hour != w.currentHour {
53+
w.bytesInHour = 0
54+
w.currentHour = hour
55+
}
56+
if w.bytesInHour+len(p) > w.quota {
57+
w.exceededOnce.Do(func() { close(w.quotaExceeded) })
58+
return 0, ErrLogQuotaExceeded
59+
}
60+
w.bytesInHour += len(p)
61+
}
62+
2763
pCopy := make([]byte, len(p))
2864
copy(pCopy, p)
2965
w.history = append(w.history, schemas.LogEvent{Message: pCopy, Timestamp: w.timestamp.Next()})

runner/internal/runner/schemas/schemas.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type SubmitBody struct {
3636
ClusterInfo ClusterInfo `json:"cluster_info"`
3737
Secrets map[string]string `json:"secrets"`
3838
RepoCredentials *RepoCredentials `json:"repo_credentials"`
39+
LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited
3940
}
4041

4142
type PullResponse struct {

src/dstack/_internal/core/models/runs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class JobTerminationReason(str, Enum):
151151
CREATING_CONTAINER_ERROR = "creating_container_error"
152152
EXECUTOR_ERROR = "executor_error"
153153
MAX_DURATION_EXCEEDED = "max_duration_exceeded"
154+
LOG_QUOTA_EXCEEDED = "log_quota_exceeded"
154155

155156
def to_status(self) -> JobStatus:
156157
mapping = {
@@ -173,6 +174,7 @@ def to_status(self) -> JobStatus:
173174
self.CREATING_CONTAINER_ERROR: JobStatus.FAILED,
174175
self.EXECUTOR_ERROR: JobStatus.FAILED,
175176
self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED,
177+
self.LOG_QUOTA_EXCEEDED: JobStatus.FAILED,
176178
}
177179
return mapping[self]
178180

@@ -205,6 +207,7 @@ def to_error(self) -> Optional[str]:
205207
JobTerminationReason.CREATING_CONTAINER_ERROR: "runner error",
206208
JobTerminationReason.EXECUTOR_ERROR: "executor error",
207209
JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded",
210+
JobTerminationReason.LOG_QUOTA_EXCEEDED: "log quota exceeded",
208211
}
209212
return error_mapping.get(self)
210213

src/dstack/_internal/server/schemas/runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ class SubmitBody(CoreModel):
103103
cluster_info: Annotated[Optional[ClusterInfo], Field(include=True)]
104104
secrets: Annotated[Optional[Dict[str, str]], Field(include=True)]
105105
repo_credentials: Annotated[Optional[RemoteRepoCreds], Field(include=True)]
106+
log_quota_hour: Annotated[Optional[int], Field(include=True)] = None
107+
"""Maximum bytes of log output per hour. None means unlimited."""
106108
# TODO: remove `run_spec` once instances deployed with 0.19.8 or earlier are no longer supported.
107109
run_spec: Annotated[
108110
RunSpec,

src/dstack/_internal/server/services/runner/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from dstack._internal.core.models.resources import Memory
1616
from dstack._internal.core.models.runs import ClusterInfo, Job, Run
1717
from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint
18+
from dstack._internal.server import settings as server_settings
1819
from dstack._internal.server.schemas.instances import InstanceCheck
1920
from dstack._internal.server.schemas.runner import (
2021
ComponentInfo,
@@ -93,13 +94,15 @@ def submit_job(
9394
merged_env.update(job_spec.env)
9495
job_spec = job_spec.copy(deep=True)
9596
job_spec.env = merged_env
97+
quota = server_settings.SERVER_LOG_QUOTA_PER_JOB_HOUR
9698
body = SubmitBody(
9799
run=run,
98100
job_spec=job_spec,
99101
job_submission=job.job_submissions[-1],
100102
cluster_info=cluster_info,
101103
secrets=secrets,
102104
repo_credentials=repo_credentials,
105+
log_quota_hour=quota if quota > 0 else None,
103106
run_spec=run.run_spec,
104107
)
105108
resp = requests.post(

src/dstack/_internal/server/settings.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,11 @@
133133

134134
SERVER_TEMPLATES_REPO = os.getenv("DSTACK_SERVER_TEMPLATES_REPO")
135135

136+
# Per-job log quota: maximum bytes of log output per calendar hour. 0 = unlimited.
137+
SERVER_LOG_QUOTA_PER_JOB_HOUR = int(
138+
os.getenv("DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR", 50 * 1024 * 1024) # 50 MB
139+
)
140+
136141
# Development settings
137142

138143
SQL_ECHO_ENABLED = os.getenv("DSTACK_SQL_ECHO_ENABLED") is not None

0 commit comments

Comments
 (0)