From aca2a3fb912e7de31a796628faa0819e07c37d8c Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 15 Jun 2026 11:13:28 +0000 Subject: [PATCH] fix(tasks): complete finalization when reconciler loses race to runner report When failTaskRunnerLost wins the cluster finalize lock over FinalizeRemoteTask but the DB already has a terminal status, the early return leaked running/active pool state and skipped End, autorun, and workflow progression. Also harden requeueTaskRunnerOffline: re-check DB before mutating to avoid requeueing a concurrently running task, and roll back in-memory state when persist fails so the next reconcile retries correctly. Make finalizeRemoteTaskLocked idempotent when End is already set in non-HA mode as well. Co-authored-by: Denis Gukov --- services/tasks/TaskPool.go | 15 +++-- services/tasks/runner_reconciler.go | 25 ++++++++ services/tasks/runner_reconciler_test.go | 81 ++++++++++++++++++++++-- 3 files changed, 108 insertions(+), 13 deletions(-) diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index 44ded1b76..07ff9c5c0 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -459,13 +459,14 @@ func (p *TaskPool) FinalizeRemoteTask(tsk *TaskRunner, runner *db.Runner) { func (p *TaskPool) finalizeRemoteTaskLocked(tsk *TaskRunner, runner *db.Runner) { if util.HAEnabled() { p.refreshTaskStatusFromDB(tsk) - if tsk.Task.End != nil { - // Another node may have persisted End before onTaskStop ran (e.g. - // crash between saveStatus and the queue drain). Release any stale - // shared pool state without re-running finish or autorun. - p.onTaskStop(tsk) - return - } + } + + if tsk.Task.End != nil { + // Another node may have persisted End before onTaskStop ran (e.g. + // crash between saveStatus and the queue drain). Release any stale + // shared pool state without re-running finish or autorun. + p.onTaskStop(tsk) + return } if runner != nil { diff --git a/services/tasks/runner_reconciler.go b/services/tasks/runner_reconciler.go index c08ced66a..38910edad 100644 --- a/services/tasks/runner_reconciler.go +++ b/services/tasks/runner_reconciler.go @@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason } if tsk.Task.Status.IsFinished() { + // Another node (or the runner report on this node) already persisted a + // terminal status. Complete finalization instead of bailing: if we won + // the finalize lock over FinalizeRemoteTask, that path will not run and + // pool/Redis state (running set, claims, End, autorun) would leak. + p.finalizeRemoteTaskLocked(tsk, runner) return } @@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason) + // Re-check the DB immediately before mutating: another node may have + // received a concurrent "running" report while we held the finalize lock. + if util.HAEnabled() { + p.refreshTaskStatusFromDB(tsk) + if tsk.Task.Status != task_logger.TaskStartingStatus && + tsk.Task.Status != task_logger.TaskWaitingStatus { + return + } + if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID { + return + } + } + + prevRunnerID := tsk.Task.RunnerID + prevStatus := tsk.Task.Status + tsk.Task.RunnerID = nil tsk.SetStatus(task_logger.TaskWaitingStatus) @@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso "task_id": tsk.Task.ID, "context": "runner_reconciler", }).Error("failed to persist requeued task") + // Roll back in-memory changes so the next reconcile tick retries via + // the runner-liveness path instead of mis-routing as undispatched. + tsk.Task.RunnerID = prevRunnerID + tsk.Task.Status = prevStatus return } diff --git a/services/tasks/runner_reconciler_test.go b/services/tasks/runner_reconciler_test.go index 22318261f..7937f3ff6 100644 --- a/services/tasks/runner_reconciler_test.go +++ b/services/tasks/runner_reconciler_test.go @@ -720,6 +720,9 @@ func TestRequeueTaskRunnerOffline_PersistError(t *testing.T) { // Persist failed: the task must not be enqueued (the old runner could // still pull it), and no requeue event must be emitted. + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, task_logger.TaskWaitingStatus, tsk.Task.Status) assert.Equal(t, 0, state.QueueLen()) assert.Empty(t, pool.queueEvents) } @@ -749,6 +752,34 @@ func TestRequeueTaskRunnerOffline_HA(t *testing.T) { assert.Equal(t, 1, state.QueueLen()) } +func TestRequeueTaskRunnerOffline_HA_SkipsWhenDBShowsRunning(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + newTask, runnerID := createReconcilerTestTask(t, store, task_logger.TaskStartingStatus, nil) + + // Another node persisted "running" while this node still has a stale "starting" copy. + running := newTask + running.Status = task_logger.TaskRunningStatus + require.NoError(t, store.UpdateTask(running)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + + pool.requeueTaskRunnerOffline(tsk, runnerID, "runner is offline") + + assert.Equal(t, task_logger.TaskRunningStatus, tsk.Task.Status) + assert.NotNil(t, tsk.Task.RunnerID) + assert.Equal(t, runnerID, *tsk.Task.RunnerID) + assert.Equal(t, 0, state.QueueLen()) + assert.Empty(t, pool.queueEvents) +} + func TestFailTaskRunnerLost_HA(t *testing.T) { t.Run("DB row still running: task failed", func(t *testing.T) { setupReconcilerConfig(t) @@ -771,10 +802,9 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { assert.NotNil(t, tsk.Task.End) }) - t.Run("DB row already finished: no-op", func(t *testing.T) { + t.Run("DB row already finished with End: releases stale pool state", func(t *testing.T) { setupReconcilerConfig(t) - // CreateTestStore replaces util.Config, so HA is enabled after it. store := sql.CreateTestStore() util.Config.HA = &util.HAConfig{Enabled: true} state := NewMemoryTaskStateStore() @@ -783,22 +813,61 @@ func TestFailTaskRunnerLost_HA(t *testing.T) { now := time.Now() newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) - // Another node already finished the task in the DB. finished := newTask finished.Status = task_logger.TaskSuccessStatus finished.End = &now require.NoError(t, store.UpdateTask(finished)) - // Stale in-memory copy still says "running". - tsk := &TaskRunner{Task: newTask, pool: &pool} + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") - // The DB refresh observed the terminal status: nothing was failed. assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) assert.Empty(t, pool.queueEvents) }) + + t.Run("DB row finished without End: completes finalization", func(t *testing.T) { + setupReconcilerConfig(t) + + store := sql.CreateTestStore() + util.Config.HA = &util.HAConfig{Enabled: true} + state := NewMemoryTaskStateStore() + pool := newReconcilerTestPool(store, state) + + now := time.Now() + newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now) + + // Runner reported terminal success on another node but lost the finalize race. + reported := newTask + reported.Status = task_logger.TaskSuccessStatus + require.NoError(t, store.UpdateTask(reported)) + + staleTask := newTask + tsk := &TaskRunner{Task: staleTask, pool: &pool} + state.SetRunning(tsk) + state.AddActive(tsk.Task.ProjectID, tsk) + + pool.failTaskRunnerLost(tsk, nil, "runner stopped responding") + + assert.NotNil(t, tsk.Task.End) + assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status) + + select { + case ev := <-pool.queueEvents: + assert.Equal(t, EventTypeFinished, ev.eventType) + pool.onTaskStop(ev.task) + default: + t.Fatal("expected EventTypeFinished in queueEvents") + } + + assert.Equal(t, 0, state.RunningCount()) + assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID)) + }) } func TestRequeueUndispatchedTask(t *testing.T) {