Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions frontend/src/libs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,6 @@ export const riseRouterException = (status = 404, json = 'Not Found'): never =>
throw new Response(json, { status });
};

export const base64ToArrayBuffer = (base64: string) => {
const binaryString = atob(base64);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
return bytes;
};

export const isValidUrl = (urlString: string) => {
try {
return Boolean(new URL(urlString));
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/Runs/Details/Logs/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export const Logs: React.FC<IProps> = ({ className, projectName, runName, jobSub

const writeDataToTerminal = (logs: ILogItem[]) => {
logs.forEach((logItem) => {
terminalInstance.current.write(logItem.message);
terminalInstance.current.write(logItem.message.replace(/(?<!\r)\n/g, '\r\n'));
});

fitAddonInstance.current.fit();
Expand Down
3 changes: 1 addition & 2 deletions frontend/src/services/project.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { API } from 'api';
import { createApi, fetchBaseQuery } from '@reduxjs/toolkit/query/react';

import { base64ToArrayBuffer } from 'libs';
import fetchBaseQueryHeaders from 'libs/fetchBaseQueryHeaders';

// Helper function to transform backend response to frontend format
Expand Down Expand Up @@ -131,7 +130,7 @@ export const projectApi = createApi({
transformResponse: (response: { logs: ILogItem[]; next_token: string }) => {
const logs = response.logs.map((logItem) => ({
...logItem,
message: base64ToArrayBuffer(logItem.message as string),
message: logItem.message,
}));

return {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/types/log.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
declare interface ILogItem {
log_source: 'stdout' | 'stderr';
timestamp: string;
message: string | Uint8Array;
message: string;
}

declare type TRequestLogsParams = {
Expand Down
4 changes: 3 additions & 1 deletion runner/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/dstackai/dstack/runner

go 1.23
go 1.23.8

require (
github.com/alexellis/go-execute/v2 v2.2.1
Expand All @@ -10,6 +10,7 @@ require (
github.com/docker/docker v26.0.0+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.5.0
github.com/dstackai/ansistrip v0.0.6
github.com/go-git/go-git/v5 v5.12.0
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
github.com/gorilla/websocket v1.5.1
Expand Down Expand Up @@ -62,6 +63,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/skeema/knownhosts v1.2.2 // indirect
github.com/tidwall/btree v1.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/ulikunitz/xz v0.5.12 // indirect
Expand Down
4 changes: 4 additions & 0 deletions runner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dstackai/ansistrip v0.0.6 h1:6qqeDNWt8NoqfkY1CxKUvdHpJzBl89LOE3wMwptVpaI=
github.com/dstackai/ansistrip v0.0.6/go.mod h1:w3ejXI0twxDv6bPXhkOaPeYdbwz2nwcrcvFoZGqi9F0=
github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE=
github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
Expand Down Expand Up @@ -171,6 +173,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
Expand Down
2 changes: 1 addition & 1 deletion runner/internal/executor/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Executor interface {
GetHistory(timestamp int64) *schemas.PullResponse
GetJobLogsHistory() []schemas.LogEvent
GetJobWsLogsHistory() []schemas.LogEvent
GetRunnerState() string
Run(ctx context.Context) error
SetCodePath(codePath string)
Expand Down
23 changes: 21 additions & 2 deletions runner/internal/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

"github.com/creack/pty"
"github.com/dstackai/ansistrip"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any benefit having this in a separate repo vs adding to dstack?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ansistrip docs should mention what it does with non-ascii, non-utf-8 sequences.

"github.com/dstackai/dstack/runner/consts"
"github.com/dstackai/dstack/runner/internal/connections"
"github.com/dstackai/dstack/runner/internal/gerrors"
Expand All @@ -27,6 +28,18 @@ import (
"github.com/prometheus/procfs"
)

// TODO: Tune these parameters for optimal experience/performance
const (
// Output is flushed when the cursor doesn't move for this duration
AnsiStripFlushInterval = 500 * time.Millisecond

// Output is flushed regardless of cursor activity after this maximum delay
AnsiStripMaxDelay = 3 * time.Second

// Maximum buffer size for ansistrip
MaxBufferSize = 32 * 1024 // 32KB
)

type RunExecutor struct {
tempDir string
homeDir string
Expand All @@ -47,6 +60,7 @@ type RunExecutor struct {
state string
jobStateHistory []schemas.JobStateEvent
jobLogs *appendWriter
jobWsLogs *appendWriter
runnerLogs *appendWriter
timestamp *MonotonicTimestamp

Expand Down Expand Up @@ -86,6 +100,7 @@ func NewRunExecutor(tempDir string, homeDir string, workingDir string, sshPort i
state: WaitSubmit,
jobStateHistory: make([]schemas.JobStateEvent, 0),
jobLogs: newAppendWriter(mu, timestamp),
jobWsLogs: newAppendWriter(mu, timestamp),
runnerLogs: newAppendWriter(mu, timestamp),
timestamp: timestamp,

Expand Down Expand Up @@ -129,7 +144,9 @@ func (ex *RunExecutor) Run(ctx context.Context) (err error) {
}
}()

logger := io.MultiWriter(runnerLogFile, os.Stdout, ex.runnerLogs)
stripper := ansistrip.NewWriter(ex.runnerLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
defer stripper.Close()
logger := io.MultiWriter(runnerLogFile, os.Stdout, stripper)
ctx = log.WithLogger(ctx, log.NewEntry(logger, int(log.DefaultEntry.Logger.Level))) // todo loglevel
log.Info(ctx, "Run job", "log_level", log.GetLogger(ctx).Logger.Level.String())

Expand Down Expand Up @@ -431,11 +448,13 @@ func (ex *RunExecutor) execJob(ctx context.Context, jobLogFile io.Writer) error
defer func() { _ = ptm.Close() }()
defer func() { _ = cmd.Wait() }() // release resources if copy fails

logger := io.MultiWriter(jobLogFile, ex.jobLogs)
stripper := ansistrip.NewWriter(ex.jobLogs, AnsiStripFlushInterval, AnsiStripMaxDelay, MaxBufferSize)
logger := io.MultiWriter(jobLogFile, ex.jobWsLogs, stripper)
_, err = io.Copy(logger, ptm)
if err != nil && !isPtyError(err) {
return gerrors.Wrap(err)
}
stripper.Close()
Comment thread
peterschmidt85 marked this conversation as resolved.
Outdated
return gerrors.Wrap(cmd.Wait())
}

Expand Down
113 changes: 104 additions & 9 deletions runner/internal/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -17,8 +18,6 @@ import (
"github.com/stretchr/testify/require"
)

// todo test get history

func TestExecutor_WorkingDir_Current(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
Expand All @@ -28,7 +27,8 @@ func TestExecutor_WorkingDir_Current(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.workingDir+"\r\n", b.String())
// Normalize line endings for cross-platform compatibility.
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_WorkingDir_Nil(t *testing.T) {
Expand All @@ -39,7 +39,7 @@ func TestExecutor_WorkingDir_Nil(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.workingDir+"\r\n", b.String())
assert.Equal(t, ex.workingDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_HomeDir(t *testing.T) {
Expand All @@ -49,7 +49,7 @@ func TestExecutor_HomeDir(t *testing.T) {

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, ex.homeDir+"\r\n", b.String())
assert.Equal(t, ex.homeDir+"\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_NonZeroExit(t *testing.T) {
Expand All @@ -61,7 +61,7 @@ func TestExecutor_NonZeroExit(t *testing.T) {
assert.Error(t, err)
assert.NotEmpty(t, ex.jobStateHistory)
exitStatus := ex.jobStateHistory[len(ex.jobStateHistory)-1].ExitStatus
assert.NotNil(t, exitStatus, ex.jobStateHistory)
assert.NotNil(t, exitStatus)
assert.Equal(t, 100, *exitStatus)
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func TestExecutor_LocalRepo(t *testing.T) {

err = ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
assert.Equal(t, "bar\r\n", b.String())
assert.Equal(t, "bar\n", strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

func TestExecutor_Recover(t *testing.T) {
Expand Down Expand Up @@ -148,8 +148,8 @@ func TestExecutor_RemoteRepo(t *testing.T) {

err = ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)
expected := fmt.Sprintf("%s\r\n%s\r\n%s\r\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
assert.Equal(t, expected, b.String())
expected := fmt.Sprintf("%s\n%s\n%s\n", ex.getRepoData().RepoHash, ex.getRepoData().RepoConfigName, ex.getRepoData().RepoConfigEmail)
assert.Equal(t, expected, strings.ReplaceAll(b.String(), "\r\n", "\n"))
}

/* Helpers */
Expand Down Expand Up @@ -236,3 +236,98 @@ func TestWriteDstackProfile(t *testing.T) {
assert.Equal(t, value, string(out))
}
}

func TestExecutor_Logs(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
// Use printf to generate ANSI control codes.
// \033[31m = red text, \033[1;32m = bold green text, \033[0m = reset
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "printf '\\033[31mRed Hello World\\033[0m\\n' && printf '\\033[1;32mBold Green Line 2\\033[0m\\n' && printf 'Line 3\\n'")

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)

logHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, logHistory)

logString := combineLogMessages(logHistory)
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")

expectedOutput := "Red Hello World\nBold Green Line 2\nLine 3\n"
assert.Equal(t, expectedOutput, normalizedLogString, "Should strip ANSI codes from regular logs")

// Verify timestamps are in order
assert.Greater(t, len(logHistory), 0)
for i := 1; i < len(logHistory); i++ {
assert.GreaterOrEqual(t, logHistory[i].Timestamp, logHistory[i-1].Timestamp)
}
}

func TestExecutor_LogsWithErrors(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)
ex.jobSpec.Commands = append(ex.jobSpec.Commands, "echo 'Success message' && echo 'Error message' >&2 && exit 1")

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.Error(t, err)

logHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, logHistory)

logString := combineLogMessages(logHistory)
normalizedLogString := strings.ReplaceAll(logString, "\r\n", "\n")

expectedOutput := "Success message\nError message\n"
assert.Equal(t, expectedOutput, normalizedLogString)
}

func TestExecutor_LogsAnsiCodeHandling(t *testing.T) {
var b bytes.Buffer
ex := makeTestExecutor(t)

// Test a variety of ANSI escape sequences on stdout and stderr.
cmd := "printf '\\033[31mRed\\033[0m \\033[32mGreen\\033[0m\\n' && " +
"printf '\\033[1mBold\\033[0m \\033[4mUnderline\\033[0m\\n' && " +
"printf '\\033[s\\033[uPlain text\\n' >&2"

ex.jobSpec.Commands = append(ex.jobSpec.Commands, cmd)

err := ex.execJob(context.TODO(), io.Writer(&b))
assert.NoError(t, err)

// 1. Check WebSocket logs, which should preserve ANSI codes.
wsLogHistory := ex.GetJobWsLogsHistory()
assert.NotEmpty(t, wsLogHistory)
wsLogString := combineLogMessages(wsLogHistory)
normalizedWsLogString := strings.ReplaceAll(wsLogString, "\r\n", "\n")

expectedWsOutput := "\033[31mRed\033[0m \033[32mGreen\033[0m\n" +
"\033[1mBold\033[0m \033[4mUnderline\033[0m\n" +
"\033[s\033[uPlain text\n"
assert.Equal(t, expectedWsOutput, normalizedWsLogString, "Websocket logs should preserve ANSI codes")

// 2. Check regular job logs, which should have ANSI codes stripped.
regularLogHistory := ex.GetHistory(0).JobLogs
assert.NotEmpty(t, regularLogHistory)
regularLogString := combineLogMessages(regularLogHistory)
normalizedRegularLogString := strings.ReplaceAll(regularLogString, "\r\n", "\n")

expectedRegularOutput := "Red Green\n" +
"Bold Underline\n" +
"Plain text\n"
assert.Equal(t, expectedRegularOutput, normalizedRegularLogString, "Regular logs should have ANSI codes stripped")

// Verify timestamps are ordered for both log types.
assert.Greater(t, len(wsLogHistory), 0)
for i := 1; i < len(wsLogHistory); i++ {
assert.GreaterOrEqual(t, wsLogHistory[i].Timestamp, wsLogHistory[i-1].Timestamp)
}
}

func combineLogMessages(logHistory []schemas.LogEvent) string {
var logOutput bytes.Buffer
for _, logEvent := range logHistory {
logOutput.Write(logEvent.Message)
}
return logOutput.String()
}
4 changes: 2 additions & 2 deletions runner/internal/executor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/dstackai/dstack/runner/internal/schemas"
)

func (ex *RunExecutor) GetJobLogsHistory() []schemas.LogEvent {
return ex.jobLogs.history
func (ex *RunExecutor) GetJobWsLogsHistory() []schemas.LogEvent {
return ex.jobWsLogs.history
}

func (ex *RunExecutor) GetHistory(timestamp int64) *schemas.PullResponse {
Expand Down
10 changes: 5 additions & 5 deletions runner/internal/runner/api/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ func (s *Server) streamJobLogs(conn *websocket.Conn) {

for {
s.executor.RLock()
jobLogsHistory := s.executor.GetJobLogsHistory()
jobLogsWsHistory := s.executor.GetJobWsLogsHistory()
select {
case <-s.shutdownCh:
if currentPos >= len(jobLogsHistory) {
if currentPos >= len(jobLogsWsHistory) {
s.executor.RUnlock()
close(s.wsDoneCh)
return
}
default:
if currentPos >= len(jobLogsHistory) {
if currentPos >= len(jobLogsWsHistory) {
s.executor.RUnlock()
time.Sleep(100 * time.Millisecond)
continue
}
}
for currentPos < len(jobLogsHistory) {
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsHistory[currentPos].Message); err != nil {
for currentPos < len(jobLogsWsHistory) {
if err := conn.WriteMessage(websocket.BinaryMessage, jobLogsWsHistory[currentPos].Message); err != nil {
s.executor.RUnlock()
log.Error(context.TODO(), "Failed to write message", "err", err)
return
Expand Down
Loading
Loading