-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix(tasks): complete finalization on reconciler race and notify workflow on template stop #3980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -195,6 +195,11 @@ func (p *TaskPool) failTaskRunnerLost(tsk *TaskRunner, runner *db.Runner, reason | |
| } | ||
|
|
||
| if tsk.Task.Status.IsFinished() { | ||
| // Another node (or the runner report on this node) already persisted a | ||
| // terminal status. Complete finalization instead of bailing: if we won | ||
| // the finalize lock over FinalizeRemoteTask, that path will not run and | ||
| // pool/Redis state (running set, claims, End, autorun) would leak. | ||
| p.finalizeRemoteTaskLocked(tsk, runner) | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -264,6 +269,22 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso | |
|
|
||
| tsk.Logf("Runner #%d lost the task: %s. Returning task to queue.", runnerID, reason) | ||
|
|
||
| // Re-check the DB immediately before mutating: another node may have | ||
| // received a concurrent "running" report while we held the finalize lock. | ||
| if util.HAEnabled() { | ||
| p.refreshTaskStatusFromDB(tsk) | ||
| if tsk.Task.Status != task_logger.TaskStartingStatus && | ||
| tsk.Task.Status != task_logger.TaskWaitingStatus { | ||
| return | ||
|
Comment on lines
+276
to
+278
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In HA, if this second DB refresh observes Useful? React with 👍 / 👎. |
||
| } | ||
| if tsk.Task.RunnerID == nil || *tsk.Task.RunnerID != runnerID { | ||
| return | ||
| } | ||
| } | ||
|
|
||
| prevRunnerID := tsk.Task.RunnerID | ||
| prevStatus := tsk.Task.Status | ||
|
|
||
| tsk.Task.RunnerID = nil | ||
| tsk.SetStatus(task_logger.TaskWaitingStatus) | ||
|
|
||
|
|
@@ -274,6 +295,10 @@ func (p *TaskPool) requeueTaskRunnerOffline(tsk *TaskRunner, runnerID int, reaso | |
| "task_id": tsk.Task.ID, | ||
| "context": "runner_reconciler", | ||
| }).Error("failed to persist requeued task") | ||
| // Roll back in-memory changes so the next reconcile tick retries via | ||
| // the runner-liveness path instead of mis-routing as undispatched. | ||
| tsk.Task.RunnerID = prevRunnerID | ||
| tsk.Task.Status = prevStatus | ||
| return | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
SetWaitingTasksToStoppedfails, or when a snapped task no longer matches the waiting update, this newly added call still reloads the task and passes it toHandleWorkflowTaskCompletionwithout checking thatStatusis actuallystopped. In that context a workflow node can advance from a task that remainswaiting/running; only notify after confirming the reload is terminal and was actually stopped.Useful? React with 👍 / 👎.