From d5c4c93f589290f000578b7cd92fc5d107442f63 Mon Sep 17 00:00:00 2001 From: ispasov Date: Thu, 19 Feb 2026 13:24:54 +0200 Subject: [PATCH] Change the cancel context to be per runner Previously jobs were tied to runners, so when a job completes, we cancelled the runner context. Job and runner were assigned during the provisioning process. However, it is possible that a job is taken from another runner. In this case when the job completes the integration cancels the context for the wrong runner. To make sure this does not happen, use the runner name to cancel the context and not the job id. --- pkg/github/runners/message-processor.go | 63 +++++++++++++------------ pkg/github/runners/types.go | 4 +- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 9e83de4..0a426e1 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -35,8 +35,8 @@ func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerI runnerScaleSetName: runnerScaleSet.Name, upstreamCanceledJobs: map[string]bool{}, upstreamCanceledJobsMutex: sync.RWMutex{}, - jobContextCancels: map[string]context.CancelFunc{}, - jobContextCancelsMutex: sync.Mutex{}, + runnerContextCancels: map[string]context.CancelFunc{}, + runnerContextCancelsMutex: sync.Mutex{}, vmTracker: vmTracker, } } @@ -125,33 +125,31 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale jobId = defaultJobId } - jobContext, cancel := context.WithCancel(p.ctx) - p.storeJobContextCancel(jobId, cancel) - go func() { var executionErr error defer p.removeUpstreamCanceledJob(jobId) - executor, commands, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) + executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, jobId) if provisioningErr != nil { if errors.Is(provisioningErr, context.Canceled) { p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) } else { p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) } - p.cancelJobContext(jobId, "provisioning failed") return } if executor == nil { p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) - p.cancelJobContext(jobId, "provisioning failed") return } - context.AfterFunc(jobContext, func() { - p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName) + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(executor.VMName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) p.vmTracker.Untrack(executor.VMName) }) @@ -166,26 +164,26 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale var exitErr *ssh.ExitError if errors.Is(executionErr, context.Canceled) { - cancelReason = "job context was canceled" - p.logger.Infof("job context canceled for JobId %s. Cleaning up resources.", jobId) + cancelReason = "runner context was canceled" + p.logger.Infof("runner context canceled for RunnerName %s with JobId %s. Cleaning up resources.", executor.VMName, jobId) } else if executionErr != nil { if errors.As(executionErr, &exitErr) { cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) - p.logger.Errorf("execution failed with exit code %d for JobId %s. Cleaning up resources.", exitErr.ExitStatus(), jobId) + p.logger.Errorf("execution failed with exit code %d for RunnerName %s with JobId %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, jobId) } else { cancelReason = fmt.Sprintf("execution failed: %v", executionErr) - p.logger.Errorf("execution failed for JobId %s. Cleaning up resources: %v", jobId, executionErr) + p.logger.Errorf("execution failed for RunnerName %s with JobId %s. Cleaning up resources: %v", executor.VMName, jobId, executionErr) } } else { cancelReason = "execution completed successfully" - p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId) + p.logger.Infof("execution completed successfully for RunnerName %s with JobId %s. Cleaning up resources.", executor.VMName, jobId) } - p.cancelJobContext(jobId, cancelReason) + p.cancelRunnerContext(executor.VMName, cancelReason) }() p.vmTracker.Track(executor.VMName) - executionErr = p.executeJobCommands(jobContext, jobId, executor, commands) + executionErr = p.executeJobCommands(runnerContext, jobId, executor, commands) }() } case "JobStarted": @@ -193,7 +191,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale if err := json.Unmarshal(message, &jobStarted); err != nil { return fmt.Errorf("could not decode job started message. %w", err) } - p.logger.Infof("Job started message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d", jobStarted.JobId, jobStarted.RunnerRequestId, jobStarted.RunnerId) + p.logger.Infof("Job started message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s", jobStarted.JobId, jobStarted.RunnerRequestId, jobStarted.RunnerId, jobStarted.RunnerName) case "JobCompleted": var jobCompleted types.JobCompleted if err := json.Unmarshal(message, &jobCompleted); err != nil { @@ -202,7 +200,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result) - p.cancelJobContext(jobCompleted.JobId, "Job completed webhook received") + runnerName := jobCompleted.RunnerName + if runnerName != "" { + p.cancelRunnerContext(runnerName, "Job completed webhook received") + } else { + p.logger.Warnf("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, but RunnerName is empty. Skipping cleanup.", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId) + } if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) { p.setUpstreamCanceledJob(jobCompleted.JobId) @@ -284,22 +287,22 @@ func (p *RunnerMessageProcessor) removeUpstreamCanceledJob(jobId string) { delete(p.upstreamCanceledJobs, jobId) } -func (p *RunnerMessageProcessor) storeJobContextCancel(jobId string, cancel context.CancelFunc) { - p.jobContextCancelsMutex.Lock() - defer p.jobContextCancelsMutex.Unlock() - p.jobContextCancels[jobId] = cancel +func (p *RunnerMessageProcessor) storeRunnerContextCancel(runnerName string, cancel context.CancelFunc) { + p.runnerContextCancelsMutex.Lock() + defer p.runnerContextCancelsMutex.Unlock() + p.runnerContextCancels[runnerName] = cancel } -func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) { - p.jobContextCancelsMutex.Lock() - defer p.jobContextCancelsMutex.Unlock() +func (p *RunnerMessageProcessor) cancelRunnerContext(runnerName string, reason string) { + p.runnerContextCancelsMutex.Lock() + defer p.runnerContextCancelsMutex.Unlock() - if cancel, exists := p.jobContextCancels[jobId]; exists { - p.logger.Infof("canceling job context for JobId: %s. Triggered by: %s", jobId, reason) + if cancel, exists := p.runnerContextCancels[runnerName]; exists { + p.logger.Infof("canceling runner context for RunnerName: %s. Triggered by: %s", runnerName, reason) cancel() - delete(p.jobContextCancels, jobId) + delete(p.runnerContextCancels, runnerName) } else { - p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason) + p.logger.Debugf("runner context for RunnerName: %s already canceled or not found. Triggered by: %s", runnerName, reason) } } diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index 75fe775..1c2fe7b 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -45,6 +45,6 @@ type RunnerMessageProcessor struct { runnerScaleSetName string upstreamCanceledJobs map[string]bool upstreamCanceledJobsMutex sync.RWMutex - jobContextCancels map[string]context.CancelFunc - jobContextCancelsMutex sync.Mutex + runnerContextCancels map[string]context.CancelFunc + runnerContextCancelsMutex sync.Mutex }