Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions services/tasks/TaskPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,14 @@ func (p *TaskPool) FinalizeRemoteTask(tsk *TaskRunner, runner *db.Runner) {
func (p *TaskPool) finalizeRemoteTaskLocked(tsk *TaskRunner, runner *db.Runner) {
if util.HAEnabled() {
p.refreshTaskStatusFromDB(tsk)
if tsk.Task.End != nil {
// Another node may have persisted End before onTaskStop ran (e.g.
// crash between saveStatus and the queue drain). Release any stale
// shared pool state without re-running finish or autorun.
p.onTaskStop(tsk)
return
}
}

if tsk.Task.End != nil {
// Another node may have persisted End before onTaskStop ran (e.g.
// crash between saveStatus and the queue drain). Release any stale
// shared pool state without re-running finish or autorun.
p.onTaskStop(tsk)
return
}

if runner != nil {
Expand Down
25 changes: 25 additions & 0 deletions services/tasks/runner_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason
}

if tsk.Task.Status.IsFinished() {
// Another node (or the runner report on this node) already persisted a
// terminal status. Complete finalization instead of bailing: if we won
// the finalize lock over FinalizeRemoteTask, that path will not run and
// pool/Redis state (running set, claims, End, autorun) would leak.
p.finalizeRemoteTaskLocked(tsk, runner)
return
}

Expand Down Expand Up @@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso

tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason)

// Re-check the DB immediately before mutating: another node may have
// received a concurrent "running" report while we held the finalize lock.
if util.HAEnabled() {
p.refreshTaskStatusFromDB(tsk)
if tsk.Task.Status != task_logger.TaskStartingStatus &&
tsk.Task.Status != task_logger.TaskWaitingStatus {
return
}
if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID {
return
}
}

prevRunnerID := tsk.Task.RunnerID
prevStatus := tsk.Task.Status

tsk.Task.RunnerID = nil
tsk.SetStatus(task_logger.TaskWaitingStatus)

Expand All @@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso
"task_id": tsk.Task.ID,
"context": "runner_reconciler",
}).Error("failed to persist requeued task")
// Roll back in-memory changes so the next reconcile tick retries via
// the runner-liveness path instead of mis-routing as undispatched.
tsk.Task.RunnerID = prevRunnerID
tsk.Task.Status = prevStatus
return
}

Expand Down
81 changes: 75 additions & 6 deletions services/tasks/runner_reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,9 @@ func TestRequeueTaskRunnerOffline_PersistError(t *testing.T) {

// Persist failed: the task must not be enqueued (the old runner could
// still pull it), and no requeue event must be emitted.
assert.NotNil(t, tsk.Task.RunnerID)
assert.Equal(t, runnerID, *tsk.Task.RunnerID)
assert.Equal(t, task_logger.TaskWaitingStatus, tsk.Task.Status)
assert.Equal(t, 0, state.QueueLen())
assert.Empty(t, pool.queueEvents)
}
Expand Down Expand Up @@ -749,6 +752,34 @@ func TestRequeueTaskRunnerOffline_HA(t *testing.T) {
assert.Equal(t, 1, state.QueueLen())
}

func TestRequeueTaskRunnerOffline_HA_SkipsWhenDBShowsRunning(t *testing.T) {
setupReconcilerConfig(t)

store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
pool := newReconcilerTestPool(store, state)

newTask, runnerID := createReconcilerTestTask(t, store, task_logger.TaskStartingStatus, nil)

// Another node persisted "running" while this node still has a stale "starting" copy.
running := newTask
running.Status = task_logger.TaskRunningStatus
require.NoError(t, store.UpdateTask(running))

staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)

pool.requeueTaskRunnerOffline(tsk, runnerID, "runner is offline")

assert.Equal(t, task_logger.TaskRunningStatus, tsk.Task.Status)
assert.NotNil(t, tsk.Task.RunnerID)
assert.Equal(t, runnerID, *tsk.Task.RunnerID)
assert.Equal(t, 0, state.QueueLen())
assert.Empty(t, pool.queueEvents)
}

func TestFailTaskRunnerLost_HA(t *testing.T) {
t.Run("DB row still running: task failed", func(t *testing.T) {
setupReconcilerConfig(t)
Expand All @@ -771,10 +802,9 @@ func TestFailTaskRunnerLost_HA(t *testing.T) {
assert.NotNil(t, tsk.Task.End)
})

t.Run("DB row already finished: no-op", func(t *testing.T) {
t.Run("DB row already finished with End: releases stale pool state", func(t *testing.T) {
setupReconcilerConfig(t)

// CreateTestStore replaces util.Config, so HA is enabled after it.
store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
Expand All @@ -783,22 +813,61 @@ func TestFailTaskRunnerLost_HA(t *testing.T) {
now := time.Now()
newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now)

// Another node already finished the task in the DB.
finished := newTask
finished.Status = task_logger.TaskSuccessStatus
finished.End = &now
require.NoError(t, store.UpdateTask(finished))

// Stale in-memory copy still says "running".
tsk := &TaskRunner{Task: newTask, pool: &pool}
staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)
state.AddActive(tsk.Task.ProjectID, tsk)

pool.failTaskRunnerLost(tsk, nil, "runner stopped responding")

// The DB refresh observed the terminal status: nothing was failed.
assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status)
assert.Equal(t, 0, state.RunningCount())
assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID))
assert.Empty(t, pool.queueEvents)
})

t.Run("DB row finished without End: completes finalization", func(t *testing.T) {
setupReconcilerConfig(t)

store := sql.CreateTestStore()
util.Config.HA = &util.HAConfig{Enabled: true}
state := NewMemoryTaskStateStore()
pool := newReconcilerTestPool(store, state)

now := time.Now()
newTask, _ := createReconcilerTestTask(t, store, task_logger.TaskRunningStatus, &now)

// Runner reported terminal success on another node but lost the finalize race.
reported := newTask
reported.Status = task_logger.TaskSuccessStatus
require.NoError(t, store.UpdateTask(reported))

staleTask := newTask
tsk := &TaskRunner{Task: staleTask, pool: &pool}
state.SetRunning(tsk)
state.AddActive(tsk.Task.ProjectID, tsk)

pool.failTaskRunnerLost(tsk, nil, "runner stopped responding")

assert.NotNil(t, tsk.Task.End)
assert.Equal(t, task_logger.TaskSuccessStatus, tsk.Task.Status)

select {
case ev := <-pool.queueEvents:
assert.Equal(t, EventTypeFinished, ev.eventType)
pool.onTaskStop(ev.task)
default:
t.Fatal("expected EventTypeFinished in queueEvents")
}

assert.Equal(t, 0, state.RunningCount())
assert.Equal(t, 0, state.ActiveCount(tsk.Task.ProjectID))
})
}

func TestRequeueUndispatchedTask(t *testing.T) {
Expand Down
Loading