diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 44ded1b76..e235f01ca 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -408,6 +408,7 @@ func runTask(task *TaskRunner, p *TaskPool) { "task_name": task.Template.Name, }).Info("Task started") go func() { + defer task.dispatching.Store(false) time.Sleep(1 * time.Second) task.run() }() @@ -459,13 +460,14 @@ func (p *TaskPool) FinalizeRemoteTask(tsk *TaskRunner, runner *db.Runner) { func (p *TaskPool) finalizeRemoteTaskLocked(tsk *TaskRunner, runner *db.Runner) { if util.HAEnabled() { p.refreshTaskStatusFromDB(tsk) - if tsk.Task.End != nil { - // Another node may have persisted End before onTaskStop ran (e.g. - // crash between saveStatus and the queue drain). Release any stale - // shared pool state without re-running finish or autorun. - p.onTaskStop(tsk) - return - } + } + + if tsk.Task.End != nil { + // Another node may have persisted End before onTaskStop ran (e.g. + // crash between saveStatus and the queue drain). Release any stale + // shared pool state without re-running finish or autorun. + p.onTaskStop(tsk) + return } if runner != nil { @@ -710,6 +712,14 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop stoppedTasks := map[int]struct{}{} + // Snapshot waiting workflow tasks before bulk-stopping. Waiting tasks have no + // running goroutine, so they never reach finishRun and would otherwise leave + // their workflow run stuck in a non-terminal state. + waitingWorkflowTasks, err := p.getWaitingWorkflowTasks(projectID, templateID) + if err != nil { + log.Error(err) + } + // Bulk-update all waiting tasks in DB in a single query. // This is the fast path -- waiting tasks have no running process. if err := p.store.SetWaitingTasksToStopped(projectID, templateID); err != nil { @@ -812,7 +822,52 @@ func (p *TaskPool) StopTasksByTemplate(projectID int, templateID int, forceStop go p.FinalizeRemoteTask(tsk, nil) } else { tsk.createTaskEvent() + p.notifyWorkflowTaskStopped(tsk.Task) + } + } + + for _, twt := range waitingWorkflowTasks { + task, taskErr := p.store.GetTask(projectID, twt.ID) + if taskErr != nil { + log.WithError(taskErr).WithFields(log.Fields{ + "task_id": twt.ID, + "context": "task_pool", + }).Warn("can't reload stopped workflow task") + continue } + p.notifyWorkflowTaskStopped(task) + } +} + +func (p *TaskPool) getWaitingWorkflowTasks(projectID int, templateID int) ([]db.TaskWithTpl, error) { + tasks, err := p.store.GetTemplateTasks(projectID, templateID, db.RetrieveQueryParams{ + TaskFilter: &db.TaskFilter{ + Status: []task_logger.TaskStatus{task_logger.TaskWaitingStatus}, + }, + }) + if err != nil { + return nil, err + } + + waitingWorkflowTasks := make([]db.TaskWithTpl, 0, len(tasks)) + for _, task := range tasks { + if task.WorkflowRunID != nil { + waitingWorkflowTasks = append(waitingWorkflowTasks, task) + } + } + return waitingWorkflowTasks, nil +} + +func (p *TaskPool) notifyWorkflowTaskStopped(task db.Task) { + if task.WorkflowRunID == nil { + return + } + if err := p.HandleWorkflowTaskCompletion(task); err != nil { + log.WithError(err).WithFields(log.Fields{ + "task_id": task.ID, + "workflow_run_id": *task.WorkflowRunID, + "context": "task_pool", + }).Error("workflow progression failed after template stop") } } diff --git a/services/tasks/TaskPool_test.go b/services/tasks/TaskPool_test.go index 27d2870c3..621025254 100644 --- a/services/tasks/TaskPool_test.go +++ b/services/tasks/TaskPool_test.go @@ -9,11 +9,46 @@ import ( "github.com/semaphoreui/semaphore/db/sql" "github.com/semaphoreui/semaphore/pkg/task_logger" "github.com/semaphoreui/semaphore/pkg/tz" + "github.com/semaphoreui/semaphore/pro_interfaces" "github.com/semaphoreui/semaphore/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +type recordingWorkflowService struct { + mu sync.Mutex + completed []db.Task +} + +func (s *recordingWorkflowService) StartWorkflow(workflow db.WorkflowTemplate, user *db.User) (db.WorkflowRun, error) { + return db.WorkflowRun{}, nil +} + +func (s *recordingWorkflowService) ProgressWorkflowRun(projectID int, runID int, user *db.User) error { + return nil +} + +func (s *recordingWorkflowService) StopWorkflowRun(projectID int, runID int, user *db.User) (db.WorkflowRun, error) { + return db.WorkflowRun{}, nil +} + +func (s *recordingWorkflowService) ResolveWorkflowApproval(projectID int, workflowID int, runID int, nodeID int, status db.WorkflowApprovalStatus, user *db.User) (db.WorkflowApproval, error) { + return db.WorkflowApproval{}, nil +} + +func (s *recordingWorkflowService) HandleWorkflowTaskCompletion(task db.Task) error { + s.mu.Lock() + defer s.mu.Unlock() + s.completed = append(s.completed, task) + return nil +} + +func (s *recordingWorkflowService) GetWorkflowRunArtifacts(projectID int, runID int, currentTaskID *int) (map[string]any, error) { + return map[string]any{}, nil +} + +var _ pro_interfaces.WorkflowService = (*recordingWorkflowService)(nil) + type spyTaskStateStore struct { *MemoryTaskStateStore tryClaimCalls int @@ -210,3 +245,64 @@ func TestTaskPool_StopTasksByTemplate_DequeuesWaitingTasksByID(t *testing.T) { assert.Equal(t, 1, state.QueueLen(), "only the targeted template's waiting task should be dequeued") assert.Equal(t, keepMe.Task.ID, state.QueueGet(0).Task.ID) } + +func TestTaskPool_StopTasksByTemplate_NotifiesWorkflowOnWaitingTasks(t *testing.T) { + prevCfg := util.Config + t.Cleanup(func() { util.Config = prevCfg }) + util.Config = &util.ConfigType{MaxParallelTasks: 0} + + store := sql.CreateTestStore() + proj, err := store.CreateProject(db.Project{}) + require.NoError(t, err) + + key, err := store.CreateAccessKey(db.AccessKey{ProjectID: &proj.ID, Type: db.AccessKeyNone}) + require.NoError(t, err) + + repo, err := store.CreateRepository(db.Repository{ + ProjectID: proj.ID, + SSHKeyID: key.ID, + Name: "Test", + GitURL: "git@example.com:test/test", + GitBranch: "master", + }) + require.NoError(t, err) + + tpl, err := store.CreateTemplate(db.Template{ + Name: "workflow tpl", + Playbook: "test.yml", + ProjectID: proj.ID, + RepositoryID: repo.ID, + }) + require.NoError(t, err) + + runID := 42 + task, err := store.CreateTask(db.Task{ + ProjectID: proj.ID, + TemplateID: tpl.ID, + Status: task_logger.TaskWaitingStatus, + WorkflowRunID: &runID, + }, 0) + require.NoError(t, err) + + state := NewMemoryTaskStateStore() + workflowSvc := &recordingWorkflowService{} + pool := TaskPool{ + state: state, + store: store, + workflowService: workflowSvc, + } + + pool.StopTasksByTemplate(proj.ID, tpl.ID, true) + + stopped, err := store.GetTask(proj.ID, task.ID) + require.NoError(t, err) + assert.Equal(t, task_logger.TaskStoppedStatus, stopped.Status) + require.NotNil(t, stopped.End) + + workflowSvc.mu.Lock() + defer workflowSvc.mu.Unlock() + require.Len(t, workflowSvc.completed, 1) + assert.Equal(t, task.ID, workflowSvc.completed[0].ID) + assert.Equal(t, runID, *workflowSvc.completed[0].WorkflowRunID) + assert.Equal(t, task_logger.TaskStoppedStatus, workflowSvc.completed[0].Status) +} diff --git a/services/tasks/runner_reconciler.go b/services/tasks/runner_reconciler.go index c08ced66a..38910edad 100644 --- a/services/tasks/runner_reconciler.go +++ b/services/tasks/runner_reconciler.go @@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason } if tsk.Task.Status.IsFinished() { + // Another node (or the runner report on this node) already persisted a + // terminal status. Complete finalization instead of bailing: if we won + // the finalize lock over FinalizeRemoteTask, that path will not run and + // pool/Redis state (running set, claims, End, autorun) would leak. + p.finalizeRemoteTaskLocked(tsk, runner) return } @@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason) + // Re-check the DB immediately before mutating: another node may have + // received a concurrent "running" report while we held the finalize lock. + if util.HAEnabled() { + p.refreshTaskStatusFromDB(tsk) + if tsk.Task.Status != task_logger.TaskStartingStatus && + tsk.Task.Status != task_logger.TaskWaitingStatus { + return + } + if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID { + return + } + } + + prevRunnerID := tsk.Task.RunnerID + prevStatus := tsk.Task.Status + tsk.Task.RunnerID = nil tsk.SetStatus(task_logger.TaskWaitingStatus) @@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso "task_id": tsk.Task.ID, "context": "runner_reconciler", }).Error("failed to persist requeued task") + // Roll back in-memory changes so the next reconcile tick retries via + // the runner-liveness path instead of mis-routing as undispatched. + tsk.Task.RunnerID = prevRunnerID + tsk.Task.Status = prevStatus return } diff --git a/services/tasks/runner_reconciler_test.go b/services/tasks/runner_reconciler_test.go index 22318261f..7937f3ff6 100644 --- a/services/tasks/runner_reconciler_test.go +++ b/services/tasks/runner_reconciler_test.go @@ -720,6 +720,9 @@ func TestRequeueTaskRunnerOffline_PersistError(t *testing.T) { // Persist failed: the task must not be enqueued (the old runner could // still pull it), and no requeue event must be emitted. + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, task_logger.TaskWaitingStatus, tsk.Task.Status) assert.Equal(t, 0, state.QueueLen()) assert.Empty(t, pool.queueEvents) } @@ -749,6 +752,34 @@ func TestRequeueTaskRunnerOffline_HA(t *testing.T) { assert.Equal(t, 1, state.QueueLen()) } +func TestRequeueTaskRunnerOffline_HA_SkipsWhenDBShowsRunning(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + newTask, runnerID := createReconcilerTestTask(t, store, task_logger.TaskStartingStatus, nil) + + // Another node persisted "running" while this node still has a stale "starting" copy. + running := newTask + running.Status = task_logger.TaskRunningStatus + require.NoError(t, store.UpdateTask(running)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + + pool.requeueTaskRunnerOffline(tsk, runnerID, "runner is offline") + + assert.Equal(t, task_logger.TaskRunningStatus, tsk.Task.Status) + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, 0, state.QueueLen()) + assert.Empty(t, pool.queueEvents) +} + func TestFailTaskRunnerLost_HA(t *testing.T) { t.Run("DB row still running: task failed", func(t *testing.T) { setupReconcilerConfig(t) @@ -771,10 +802,9 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { assert.NotNil(t, tsk.Task.End) }) - t.Run("DB row already finished: no-op", func(t *testing.T) { + t.Run("DB row already finished with End: releases stale pool state", func(t *testing.T) { setupReconcilerConfig(t) - // CreateTestStore replaces util.Config, so HA is enabled after it. store := sql.CreateTestStore() util.Config.HA = &util.HAConfig{Enabled: true} state := NewMemoryTaskStateStore() @@ -783,22 +813,61 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { now := time.Now() newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) - // Another node already finished the task in the DB. finished := newTask finished.Status = task_logger.TaskSuccessStatus finished.End = &now require.NoError(t, store.UpdateTask(finished)) - // Stale in-memory copy still says "running". - tsk := &TaskRunner{Task: newTask, pool: &pool} + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") - // The DB refresh observed the terminal status: nothing was failed. assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) assert.Empty(t, pool.queueEvents) }) + + t.Run("DB row finished without End: completes finalization", func(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + now := time.Now() + newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) + + // Runner reported terminal success on another node but lost the finalize race. + reported := newTask + reported.Status = task_logger.TaskSuccessStatus + require.NoError(t, store.UpdateTask(reported)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) + + pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") + + assert.NotNil(t, tsk.Task.End) + assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + + select { + case ev := <-pool.queueEvents: + assert.Equal(t, EventTypeFinished, ev.eventType) + pool.onTaskStop(ev.task) + default: + t.Fatal("expected EventTypeFinished in queueEvents") + } + + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) + }) } func TestRequeueUndispatchedTask(t *testing.T) {