From 99a2f245a4334965e38f857bb9908d49eb4643eb Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Sat, 20 Jun 2026 11:10:13 +0000 Subject: [PATCH] fix(tasks): complete finalization on reconciler race and notify workflow on template stop When failTaskRunnerLost wins the cluster finalize lock but the DB already has a terminal status, complete finalization instead of returning early. This releases stale running/active pool state and persists End when the runner report lost the finalize race. StopTasksByTemplate bulk-stops waiting tasks without going through finishRun, so notify the workflow service for waiting workflow tasks. Also clear the dispatching flag when the dispatch goroutine exits and harden requeueTaskRunnerOffline against concurrent running reports. Co-authored-by: Denis Gukov --- services/tasks/TaskPool.go | 69 +++++++++++++++-- services/tasks/TaskPool_test.go | 96 ++++++++++++++++++++++++ services/tasks/runner_reconciler.go | 25 ++++++ services/tasks/runner_reconciler_test.go | 81 ++++++++++++++++++-- 4 files changed, 258 insertions(+), 13 deletions(-) diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 44ded1b76..e235f01ca 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -408,6 +408,7 @@ func runTask(task *TaskRunner, p *TaskPool) { "task_name": task.Template.Name, }).Info("Task started") go func() { + defer task.dispatching.Store(false) time.Sleep(1 * time.Second) task.run() }() @@ -459,13 +460,14 @@ func (p *TaskPool) FinalizeRemoteTask(tsk *TaskRunner, runner *db.Runner) { func (p *TaskPool) finalizeRemoteTaskLocked(tsk *TaskRunner, runner *db.Runner) { if util.HAEnabled() { p.refreshTaskStatusFromDB(tsk) - if tsk.Task.End != nil { - // Another node may have persisted End before onTaskStop ran (e.g. - // crash between saveStatus and the queue drain). Release any stale - // shared pool state without re-running finish or autorun. - p.onTaskStop(tsk) - return - } + } + + if tsk.Task.End != nil { + // Another node may have persisted End before onTaskStop ran (e.g. + // crash between saveStatus and the queue drain). Release any stale + // shared pool state without re-running finish or autorun. + p.onTaskStop(tsk) + return } if runner != nil { @@ -710,6 +712,14 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop stoppedTasks := map[int]struct{}{} + // Snapshot waiting workflow tasks before bulk-stopping. Waiting tasks have no + // running goroutine, so they never reach finishRun and would otherwise leave + // their workflow run stuck in a non-terminal state. + waitingWorkflowTasks, err := p.getWaitingWorkflowTasks(projectID, templateID) + if err != nil { + log.Error(err) + } + // Bulk-update all waiting tasks in DB in a single query. // This is the fast path -- waiting tasks have no running process. if err := p.store.SetWaitingTasksToStopped(projectID, templateID); err != nil { @@ -812,7 +822,52 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop go p.FinalizeRemoteTask(tsk, nil) } else { tsk.createTaskEvent() + p.notifyWorkflowTaskStopped(tsk.Task) + } + } + + for _, twt := range waitingWorkflowTasks { + task, taskErr := p.store.GetTask(projectID, twt.ID) + if taskErr != nil { + log.WithError(taskErr).WithFields(log.Fields{ + "task_id": twt.ID, + "context": "task_pool", + }).Warn("can't reload stopped workflow task") + continue } + p.notifyWorkflowTaskStopped(task) + } +} + +func (p *TaskPool) getWaitingWorkflowTasks(projectID int, templateID int) ([]db.TaskWithTpl, error) { + tasks, err := p.store.GetTemplateTasks(projectID, templateID, db.RetrieveQueryParams{ + TaskFilter: &db.TaskFilter{ + Status: []task_logger.TaskStatus{task_logger.TaskWaitingStatus}, + }, + }) + if err != nil { + return nil, err + } + + waitingWorkflowTasks := make([]db.TaskWithTpl, 0, len(tasks)) + for _, task := range tasks { + if task.WorkflowRunID != nil { + waitingWorkflowTasks = append(waitingWorkflowTasks, task) + } + } + return waitingWorkflowTasks, nil +} + +func (p *TaskPool) notifyWorkflowTaskStopped(task db.Task) { + if task.WorkflowRunID == nil { + return + } + if err := p.HandleWorkflowTaskCompletion(task); err != nil { + log.WithError(err).WithFields(log.Fields{ + "task_id": task.ID, + "workflow_run_id": *task.WorkflowRunID, + "context": "task_pool", + }).Error("workflow progression failed after template stop") } } diff --git a/services/tasks/TaskPool_test.go b/services/tasks/TaskPool_test.go index 27d2870c3..621025254 100644 --- a/services/tasks/TaskPool_test.go +++ b/services/tasks/TaskPool_test.go @@ -9,11 +9,46 @@ import ( "github.com/semaphoreui/semaphore/db/sql" "github.com/semaphoreui/semaphore/pkg/task_logger" "github.com/semaphoreui/semaphore/pkg/tz" + "github.com/semaphoreui/semaphore/pro_interfaces" "github.com/semaphoreui/semaphore/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type recordingWorkflowService struct { + mu sync.Mutex + completed []db.Task +} + +func (s *recordingWorkflowService) StartWorkflow(workflow db.WorkflowTemplate, user *db.User) (db.WorkflowRun, error) { + return db.WorkflowRun{}, nil +} + +func (s *recordingWorkflowService) ProgressWorkflowRun(projectID int, runID int, user *db.User) error { + return nil +} + +func (s *recordingWorkflowService) StopWorkflowRun(projectID int, runID int, user *db.User) (db.WorkflowRun, error) { + return db.WorkflowRun{}, nil +} + +func (s *recordingWorkflowService) ResolveWorkflowApproval(projectID int, workflowID int, runID int, nodeID int, status db.WorkflowApprovalStatus, user *db.User) (db.WorkflowApproval, error) { + return db.WorkflowApproval{}, nil +} + +func (s *recordingWorkflowService) HandleWorkflowTaskCompletion(task db.Task) error { + s.mu.Lock() + defer s.mu.Unlock() + s.completed = append(s.completed, task) + return nil +} + +func (s *recordingWorkflowService) GetWorkflowRunArtifacts(projectID int, runID int, currentTaskID *int) (map[string]any, error) { + return map[string]any{}, nil +} + +var _ pro_interfaces.WorkflowService = (*recordingWorkflowService)(nil) + type spyTaskStateStore struct { *MemoryTaskStateStore tryClaimCalls int @@ -210,3 +245,64 @@ func TestTaskPool_StopTasksByTemplate_DequeuesWaitingTasksByID(t *testing.T) { assert.Equal(t, 1, state.QueueLen(), "only the targeted template's waiting task should be dequeued") assert.Equal(t, keepMe.Task.ID, state.QueueGet(0).Task.ID) } + +func TestTaskPool_StopTasksByTemplate_NotifiesWorkflowOnWaitingTasks(t *testing.T) { + prevCfg := util.Config + t.Cleanup(func() { util.Config = prevCfg }) + util.Config = &util.ConfigType{MaxParallelTasks: 0} + + store := sql.CreateTestStore() + proj, err := store.CreateProject(db.Project{}) + require.NoError(t, err) + + key, err := store.CreateAccessKey(db.AccessKey{ProjectID: &proj.ID, Type: db.AccessKeyNone}) + require.NoError(t, err) + + repo, err := store.CreateRepository(db.Repository{ + ProjectID: proj.ID, + SSHKeyID: key.ID, + Name: "Test", + GitURL: "git@example.com:test/test", + GitBranch: "master", + }) + require.NoError(t, err) + + tpl, err := store.CreateTemplate(db.Template{ + Name: "workflow tpl", + Playbook: "test.yml", + ProjectID: proj.ID, + RepositoryID: repo.ID, + }) + require.NoError(t, err) + + runID := 42 + task, err := store.CreateTask(db.Task{ + ProjectID: proj.ID, + TemplateID: tpl.ID, + Status: task_logger.TaskWaitingStatus, + WorkflowRunID: &runID, + }, 0) + require.NoError(t, err) + + state := NewMemoryTaskStateStore() + workflowSvc := &recordingWorkflowService{} + pool := TaskPool{ + state: state, + store: store, + workflowService: workflowSvc, + } + + pool.StopTasksByTemplate(proj.ID, tpl.ID, true) + + stopped, err := store.GetTask(proj.ID, task.ID) + require.NoError(t, err) + assert.Equal(t, task_logger.TaskStoppedStatus, stopped.Status) + require.NotNil(t, stopped.End) + + workflowSvc.mu.Lock() + defer workflowSvc.mu.Unlock() + require.Len(t, workflowSvc.completed, 1) + assert.Equal(t, task.ID, workflowSvc.completed[0].ID) + assert.Equal(t, runID, *workflowSvc.completed[0].WorkflowRunID) + assert.Equal(t, task_logger.TaskStoppedStatus, workflowSvc.completed[0].Status) +} diff --git a/services/tasks/runner_reconciler.go b/services/tasks/runner_reconciler.go index c08ced66a..38910edad 100644 --- a/services/tasks/runner_reconciler.go +++ b/services/tasks/runner_reconciler.go @@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason } if tsk.Task.Status.IsFinished() { + // Another node (or the runner report on this node) already persisted a + // terminal status. Complete finalization instead of bailing: if we won + // the finalize lock over FinalizeRemoteTask, that path will not run and + // pool/Redis state (running set, claims, End, autorun) would leak. + p.finalizeRemoteTaskLocked(tsk, runner) return } @@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason) + // Re-check the DB immediately before mutating: another node may have + // received a concurrent "running" report while we held the finalize lock. + if util.HAEnabled() { + p.refreshTaskStatusFromDB(tsk) + if tsk.Task.Status != task_logger.TaskStartingStatus && + tsk.Task.Status != task_logger.TaskWaitingStatus { + return + } + if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID { + return + } + } + + prevRunnerID := tsk.Task.RunnerID + prevStatus := tsk.Task.Status + tsk.Task.RunnerID = nil tsk.SetStatus(task_logger.TaskWaitingStatus) @@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso "task_id": tsk.Task.ID, "context": "runner_reconciler", }).Error("failed to persist requeued task") + // Roll back in-memory changes so the next reconcile tick retries via + // the runner-liveness path instead of mis-routing as undispatched. + tsk.Task.RunnerID = prevRunnerID + tsk.Task.Status = prevStatus return } diff --git a/services/tasks/runner_reconciler_test.go b/services/tasks/runner_reconciler_test.go index 22318261f..7937f3ff6 100644 --- a/services/tasks/runner_reconciler_test.go +++ b/services/tasks/runner_reconciler_test.go @@ -720,6 +720,9 @@ func TestRequeueTaskRunnerOffline_PersistError(t *testing.T) { // Persist failed: the task must not be enqueued (the old runner could // still pull it), and no requeue event must be emitted. + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, task_logger.TaskWaitingStatus, tsk.Task.Status) assert.Equal(t, 0, state.QueueLen()) assert.Empty(t, pool.queueEvents) } @@ -749,6 +752,34 @@ func TestRequeueTaskRunnerOffline_HA(t *testing.T) { assert.Equal(t, 1, state.QueueLen()) } +func TestRequeueTaskRunnerOffline_HA_SkipsWhenDBShowsRunning(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + newTask, runnerID := createReconcilerTestTask(t, store, task_logger.TaskStartingStatus, nil) + + // Another node persisted "running" while this node still has a stale "starting" copy. + running := newTask + running.Status = task_logger.TaskRunningStatus + require.NoError(t, store.UpdateTask(running)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + + pool.requeueTaskRunnerOffline(tsk, runnerID, "runner is offline") + + assert.Equal(t, task_logger.TaskRunningStatus, tsk.Task.Status) + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, 0, state.QueueLen()) + assert.Empty(t, pool.queueEvents) +} + func TestFailTaskRunnerLost_HA(t *testing.T) { t.Run("DB row still running: task failed", func(t *testing.T) { setupReconcilerConfig(t) @@ -771,10 +802,9 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { assert.NotNil(t, tsk.Task.End) }) - t.Run("DB row already finished: no-op", func(t *testing.T) { + t.Run("DB row already finished with End: releases stale pool state", func(t *testing.T) { setupReconcilerConfig(t) - // CreateTestStore replaces util.Config, so HA is enabled after it. store := sql.CreateTestStore() util.Config.HA = &util.HAConfig{Enabled: true} state := NewMemoryTaskStateStore() @@ -783,22 +813,61 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { now := time.Now() newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) - // Another node already finished the task in the DB. finished := newTask finished.Status = task_logger.TaskSuccessStatus finished.End = &now require.NoError(t, store.UpdateTask(finished)) - // Stale in-memory copy still says "running". - tsk := &TaskRunner{Task: newTask, pool: &pool} + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") - // The DB refresh observed the terminal status: nothing was failed. assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) assert.Empty(t, pool.queueEvents) }) + + t.Run("DB row finished without End: completes finalization", func(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + now := time.Now() + newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) + + // Runner reported terminal success on another node but lost the finalize race. + reported := newTask + reported.Status = task_logger.TaskSuccessStatus + require.NoError(t, store.UpdateTask(reported)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) + + pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") + + assert.NotNil(t, tsk.Task.End) + assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + + select { + case ev := <-pool.queueEvents: + assert.Equal(t, EventTypeFinished, ev.eventType) + pool.onTaskStop(ev.task) + default: + t.Fatal("expected EventTypeFinished in queueEvents") + } + + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) + }) } func TestRequeueUndispatchedTask(t *testing.T) {