Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 62 additions & 7 deletions services/tasks/TaskPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ func runTask(task *TaskRunner, p *TaskPool) {
"task_name": task.Template.Name,
}).Info("Task started")
go func() {
defer task.dispatching.Store(false)
time.Sleep(1 * time.Second)
task.run()
}()
Expand Down Expand Up @@ -459,13 +460,14 @@ func (p *TaskPool) FinalizeRemoteTask(tsk *TaskRunner, runner *db.Runner) {
func (p *TaskPool) finalizeRemoteTaskLocked(tsk *TaskRunner, runner *db.Runner) {
if util.HAEnabled() {
p.refreshTaskStatusFromDB(tsk)
if tsk.Task.End != nil {
// Another node may have persisted End before onTaskStop ran (e.g.
// crash between saveStatus and the queue drain). Release any stale
// shared pool state without re-running finish or autorun.
p.onTaskStop(tsk)
return
}
}

if tsk.Task.End != nil {
// Another node may have persisted End before onTaskStop ran (e.g.
// crash between saveStatus and the queue drain). Release any stale
// shared pool state without re-running finish or autorun.
p.onTaskStop(tsk)
return
}

if runner != nil {
Expand Down Expand Up @@ -710,6 +712,14 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop

stoppedTasks := map[int]struct{}{}

// Snapshot waiting workflow tasks before bulk-stopping. Waiting tasks have no
// running goroutine, so they never reach finishRun and would otherwise leave
// their workflow run stuck in a non-terminal state.
waitingWorkflowTasks, err := p.getWaitingWorkflowTasks(projectID, templateID)
if err != nil {
log.Error(err)
}

// Bulk-update all waiting tasks in DB in a single query.
// This is the fast path -- waiting tasks have no running process.
if err := p.store.SetWaitingTasksToStopped(projectID, templateID); err != nil {
Expand Down Expand Up @@ -812,7 +822,52 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop
go p.FinalizeRemoteTask(tsk, nil)
} else {
tsk.createTaskEvent()
p.notifyWorkflowTaskStopped(tsk.Task)
}
}

for _, twt := range waitingWorkflowTasks {
task, taskErr := p.store.GetTask(projectID, twt.ID)
if taskErr != nil {
log.WithError(taskErr).WithFields(log.Fields{
"task_id": twt.ID,
"context": "task_pool",
}).Warn("can't reload stopped workflow task")
continue
}
p.notifyWorkflowTaskStopped(task)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard workflow notifications on a terminal stop

When SetWaitingTasksToStopped fails, or when a snapped task no longer matches the waiting update, this newly added call still reloads the task and passes it to HandleWorkflowTaskCompletion without checking that Status is actually stopped. In that context a workflow node can advance from a task that remains waiting/running; only notify after confirming the reload is terminal and was actually stopped.

Useful? React with 👍 / 👎.

}
}

func (p *TaskPool) getWaitingWorkflowTasks(projectID int, templateID int) ([]db.TaskWithTpl, error) {
tasks, err := p.store.GetTemplateTasks(projectID, templateID, db.RetrieveQueryParams{
TaskFilter: &db.TaskFilter{
Status: []task_logger.TaskStatus{task_logger.TaskWaitingStatus},
},
})
if err != nil {
return nil, err
}

waitingWorkflowTasks := make([]db.TaskWithTpl, 0, len(tasks))
for _, task := range tasks {
if task.WorkflowRunID != nil {
waitingWorkflowTasks = append(waitingWorkflowTasks, task)
}
}
return waitingWorkflowTasks, nil
}

func (p *TaskPool) notifyWorkflowTaskStopped(task db.Task) {
if task.WorkflowRunID == nil {
return
}
if err := p.HandleWorkflowTaskCompletion(task); err != nil {
log.WithError(err).WithFields(log.Fields{
"task_id": task.ID,
"workflow_run_id": *task.WorkflowRunID,
"context": "task_pool",
}).Error("workflow progression failed after template stop")
}
}

Expand Down
96 changes: 96 additions & 0 deletions services/tasks/TaskPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,46 @@ import (
"github.com/semaphoreui/semaphore/db/sql"
"github.com/semaphoreui/semaphore/pkg/task_logger"
"github.com/semaphoreui/semaphore/pkg/tz"
"github.com/semaphoreui/semaphore/pro_interfaces"
"github.com/semaphoreui/semaphore/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type recordingWorkflowService struct {
mu sync.Mutex
completed []db.Task
}

func (s *recordingWorkflowService) StartWorkflow(workflow db.WorkflowTemplate, user *db.User) (db.WorkflowRun, error) {
return db.WorkflowRun{}, nil
}

func (s *recordingWorkflowService) ProgressWorkflowRun(projectID int, runID int, user *db.User) error {
return nil
}

func (s *recordingWorkflowService) StopWorkflowRun(projectID int, runID int, user *db.User) (db.WorkflowRun, error) {
return db.WorkflowRun{}, nil
}

func (s *recordingWorkflowService) ResolveWorkflowApproval(projectID int, workflowID int, runID int, nodeID int, status db.WorkflowApprovalStatus, user *db.User) (db.WorkflowApproval, error) {
return db.WorkflowApproval{}, nil
}

func (s *recordingWorkflowService) HandleWorkflowTaskCompletion(task db.Task) error {
s.mu.Lock()
defer s.mu.Unlock()
s.completed = append(s.completed, task)
return nil
}

func (s *recordingWorkflowService) GetWorkflowRunArtifacts(projectID int, runID int, currentTaskID *int) (map[string]any, error) {
return map[string]any{}, nil
}

var _ pro_interfaces.WorkflowService = (*recordingWorkflowService)(nil)

type spyTaskStateStore struct {
*MemoryTaskStateStore
tryClaimCalls int
Expand Down Expand Up @@ -210,3 +245,64 @@ func TestTaskPool_StopTasksByTemplate_DequeuesWaitingTasksByID(t *testing.T) {
assert.Equal(t, 1, state.QueueLen(), "only the targeted template's waiting task should be dequeued")
assert.Equal(t, keepMe.Task.ID, state.QueueGet(0).Task.ID)
}

func TestTaskPool_StopTasksByTemplate_NotifiesWorkflowOnWaitingTasks(t *testing.T) {
prevCfg := util.Config
t.Cleanup(func() { util.Config = prevCfg })
util.Config = &util.ConfigType{MaxParallelTasks: 0}

store := sql.CreateTestStore()
proj, err := store.CreateProject(db.Project{})
require.NoError(t, err)

key, err := store.CreateAccessKey(db.AccessKey{ProjectID: &proj.ID, Type: db.AccessKeyNone})
require.NoError(t, err)

repo, err := store.CreateRepository(db.Repository{
ProjectID: proj.ID,
SSHKeyID: key.ID,
Name: "Test",
GitURL: "git@example.com:test/test",
GitBranch: "master",
})
require.NoError(t, err)

tpl, err := store.CreateTemplate(db.Template{
Name: "workflow tpl",
Playbook: "test.yml",
ProjectID: proj.ID,
RepositoryID: repo.ID,
})
require.NoError(t, err)

runID := 42
task, err := store.CreateTask(db.Task{
ProjectID: proj.ID,
TemplateID: tpl.ID,
Status: task_logger.TaskWaitingStatus,
WorkflowRunID: &runID,
}, 0)
require.NoError(t, err)

state := NewMemoryTaskStateStore()
workflowSvc := &recordingWorkflowService{}
pool := TaskPool{
state: state,
store: store,
workflowService: workflowSvc,
}

pool.StopTasksByTemplate(proj.ID, tpl.ID, true)

stopped, err := store.GetTask(proj.ID, task.ID)
require.NoError(t, err)
assert.Equal(t, task_logger.TaskStoppedStatus, stopped.Status)
require.NotNil(t, stopped.End)

workflowSvc.mu.Lock()
defer workflowSvc.mu.Unlock()
require.Len(t, workflowSvc.completed, 1)
assert.Equal(t, task.ID, workflowSvc.completed[0].ID)
assert.Equal(t, runID, *workflowSvc.completed[0].WorkflowRunID)
assert.Equal(t, task_logger.TaskStoppedStatus, workflowSvc.completed[0].Status)
}
25 changes: 25 additions & 0 deletions services/tasks/runner_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason
}

if tsk.Task.Status.IsFinished() {
// Another node (or the runner report on this node) already persisted a
// terminal status. Complete finalization instead of bailing: if we won
// the finalize lock over FinalizeRemoteTask, that path will not run and
// pool/Redis state (running set, claims, End, autorun) would leak.
p.finalizeRemoteTaskLocked(tsk, runner)
return
}

Expand Down Expand Up @@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso

tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason)

// Re-check the DB immediately before mutating: another node may have
// received a concurrent "running" report while we held the finalize lock.
if util.HAEnabled() {
p.refreshTaskStatusFromDB(tsk)
if tsk.Task.Status != task_logger.TaskStartingStatus &&
tsk.Task.Status != task_logger.TaskWaitingStatus {
return
Comment on lines +276 to +278

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Finalize terminal reports seen by the requeue recheck

In HA, if this second DB refresh observes success/error/stopped because the runner reported a terminal status while this goroutine holds TryFinalize, this branch just returns. The runner report's FinalizeRemoteTask loses that same finalize lock and exits, so no later path writes End, emits workflow completion, or releases running/claim state; handle finished statuses here like failTaskRunnerLost instead of treating them like running.

Useful? React with 👍 / 👎.

}
if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID {
return
}
}

prevRunnerID := tsk.Task.RunnerID
prevStatus := tsk.Task.Status

tsk.Task.RunnerID = nil
tsk.SetStatus(task_logger.TaskWaitingStatus)

Expand All @@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso
"task_id": tsk.Task.ID,
"context": "runner_reconciler",
}).Error("failed to persist requeued task")
// Roll back in-memory changes so the next reconcile tick retries via
// the runner-liveness path instead of mis-routing as undispatched.
tsk.Task.RunnerID = prevRunnerID
tsk.Task.Status = prevStatus
return
}

Expand Down
81 changes: 75 additions & 6 deletions services/tasks/runner_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ func TestRequeueTaskRunnerOffline_PersistError(t *testing.T) {

// Persist failed: the task must not be enqueued (the old runner could
// still pull it), and no requeue event must be emitted.
assert.NotNil(t, tsk.Task.RunnerID)
assert.Equal(t, runnerID, *tsk.Task.RunnerID)
assert.Equal(t, task_logger.TaskWaitingStatus, tsk.Task.Status)
assert.Equal(t, 0, state.QueueLen())
assert.Empty(t, pool.queueEvents)
}
Expand Down Expand Up @@ -749,6 +752,34 @@ func TestRequeueTaskRunnerOffline_HA(t *testing.T) {
assert.Equal(t, 1, state.QueueLen())
}

func TestRequeueTaskRunnerOffline_HA_SkipsWhenDBShowsRunning(t *testing.T) {
setupReconcilerConfig(t)

store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
pool := newReconcilerTestPool(store, state)

newTask, runnerID := createReconcilerTestTask(t, store, task_logger.TaskStartingStatus, nil)

// Another node persisted "running" while this node still has a stale "starting" copy.
running := newTask
running.Status = task_logger.TaskRunningStatus
require.NoError(t, store.UpdateTask(running))

staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)

pool.requeueTaskRunnerOffline(tsk, runnerID, "runner is offline")

assert.Equal(t, task_logger.TaskRunningStatus, tsk.Task.Status)
assert.NotNil(t, tsk.Task.RunnerID)
assert.Equal(t, runnerID, *tsk.Task.RunnerID)
assert.Equal(t, 0, state.QueueLen())
assert.Empty(t, pool.queueEvents)
}

func TestFailTaskRunnerLost_HA(t *testing.T) {
t.Run("DB row still running: task failed", func(t *testing.T) {
setupReconcilerConfig(t)
Expand All @@ -771,10 +802,9 @@ func TestFailTaskRunnerLost_HA(t *testing.T) {
assert.NotNil(t, tsk.Task.End)
})

t.Run("DB row already finished: no-op", func(t *testing.T) {
t.Run("DB row already finished with End: releases stale pool state", func(t *testing.T) {
setupReconcilerConfig(t)

// CreateTestStore replaces util.Config, so HA is enabled after it.
store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
Expand All @@ -783,22 +813,61 @@ func TestFailTaskRunnerLost_HA(t *testing.T) {
now := time.Now()
newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now)

// Another node already finished the task in the DB.
finished := newTask
finished.Status = task_logger.TaskSuccessStatus
finished.End = &now
require.NoError(t, store.UpdateTask(finished))

// Stale in-memory copy still says "running".
tsk := &TaskRunner{Task: newTask, pool: &pool}
staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)
state.AddActive(tsk.Task.ProjectID, tsk)

pool.failTaskRunnerLost(tsk, nil, "runner stopped responding")

// The DB refresh observed the terminal status: nothing was failed.
assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status)
assert.Equal(t, 0, state.RunningCount())
assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID))
assert.Empty(t, pool.queueEvents)
})

t.Run("DB row finished without End: completes finalization", func(t *testing.T) {
setupReconcilerConfig(t)

store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
pool := newReconcilerTestPool(store, state)

now := time.Now()
newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now)

// Runner reported terminal success on another node but lost the finalize race.
reported := newTask
reported.Status = task_logger.TaskSuccessStatus
require.NoError(t, store.UpdateTask(reported))

staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)
state.AddActive(tsk.Task.ProjectID, tsk)

pool.failTaskRunnerLost(tsk, nil, "runner stopped responding")

assert.NotNil(t, tsk.Task.End)
assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status)

select {
case ev := <-pool.queueEvents:
assert.Equal(t, EventTypeFinished, ev.eventType)
pool.onTaskStop(ev.task)
default:
t.Fatal("expected EventTypeFinished in queueEvents")
}

assert.Equal(t, 0, state.RunningCount())
assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID))
})
}

func TestRequeueUndispatchedTask(t *testing.T) {
Expand Down
Loading