Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 50 additions & 40 deletions services/runners/job_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,46 +321,7 @@ func (p *JobPool) Run() {
}).Debug("Running job")

err := running.job.Run(t.username, t.incomingVersion, t.alias)

if err != nil {

log.WithFields(log.Fields{
"context": "job_running",
"task_id": t.taskID,
"task_status": t.status,
}).WithError(err).Error("launch job failed")

running.Log("Unable to launch the application. Please contact your system administrator for assistance.")

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskFailStatus)
}
} else {

log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Debug("Job run returned")

if running.getStatus().IsFinished() {
return
}

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskSuccessStatus)
}
}

log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Info("Task finished")
running.handleJobRunComplete(err)
}(rj)

log.WithFields(log.Fields{
Expand Down Expand Up @@ -526,6 +487,55 @@ func (p *JobPool) sendProgress() (ok bool) {
return
}

// handleJobRunComplete applies the terminal status transition after Executor.Run
// returns. Run often returns a non-nil error when the job was stopped or killed
// mid-flight; in that case the executor (or a concurrent stop/kill) may already
// have moved the status to a finished state and it must not be overwritten.
func (running *runningJob) handleJobRunComplete(err error) {
if running.getStatus().IsFinished() {
log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Debug("Job run returned")
return
}

if err != nil {
log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"task_status": running.getStatus(),
}).WithError(err).Error("launch job failed")

running.Log("Unable to launch the application. Please contact your system administrator for assistance.")

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskFailStatus)
}
} else {
log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Debug("Job run returned")

if running.getStatus() == task_logger.TaskStoppingStatus {
running.SetStatus(task_logger.TaskStoppedStatus)
} else {
running.SetStatus(task_logger.TaskSuccessStatus)
}
}

log.WithFields(log.Fields{
"context": "job_running",
"task_id": running.taskID,
"status": string(running.getStatus()),
}).Info("Task finished")
}

// applyTerminatedJobs emergency-stops jobs the server no longer accepts
// results for — the task reached a terminal status on the server (e.g. force
// stopped) while the runner was offline. The job's process is killed and the
Expand Down
54 changes: 54 additions & 0 deletions services/runners/job_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runners

import (
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"sync"
Expand Down Expand Up @@ -85,6 +86,59 @@ func TestJobPool_RunningJobsLifecycle(t *testing.T) {
assert.Nil(t, p.getRunningJob(1))
}

func TestHandleJobRunComplete_PreservesFinishedStatusOnError(t *testing.T) {
// When the server already moved the job to stopped (e.g. user stop confirmed
// via checkNewJobs) and Executor.Run returns an error from the killed process,
// the stopped status must not be overwritten with failed.
rj := &runningJob{
taskID: 1,
job: &tasks.LocalExecutor{Task: db.Task{ID: 1}},
status: task_logger.TaskStoppedStatus,
}
rj.handleJobRunComplete(errors.New("signal: killed"))

assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus())
}

func TestHandleJobRunComplete_StoppingBecomesStoppedOnError(t *testing.T) {
lj := &tasks.LocalExecutor{Task: db.Task{ID: 2}}
rj := &runningJob{
taskID: 2,
job: lj,
status: task_logger.TaskStoppingStatus,
}
lj.Logger = rj
rj.handleJobRunComplete(errors.New("prepare failed"))

assert.Equal(t, task_logger.TaskStoppedStatus, rj.getStatus())
}

func TestHandleJobRunComplete_RunningBecomesFailedOnError(t *testing.T) {
lj := &tasks.LocalExecutor{Task: db.Task{ID: 3}}
rj := &runningJob{
taskID: 3,
job: lj,
status: task_logger.TaskRunningStatus,
}
lj.Logger = rj
rj.handleJobRunComplete(errors.New("prepare failed"))

assert.Equal(t, task_logger.TaskFailStatus, rj.getStatus())
}

func TestHandleJobRunComplete_RunningBecomesSuccess(t *testing.T) {
lj := &tasks.LocalExecutor{Task: db.Task{ID: 4}}
rj := &runningJob{
taskID: 4,
job: lj,
status: task_logger.TaskRunningStatus,
}
lj.Logger = rj
rj.handleJobRunComplete(nil)

assert.Equal(t, task_logger.TaskSuccessStatus, rj.getStatus())
}

func TestJobPool_HasRunningJobs(t *testing.T) {
p := NewJobPool(nil)
assert.False(t, p.hasRunningJobs())
Expand Down
Loading