diff --git a/pkg/github/runners/message-processor.go b/pkg/github/runners/message-processor.go index 9e83de4..0a426e1 100644 --- a/pkg/github/runners/message-processor.go +++ b/pkg/github/runners/message-processor.go @@ -35,8 +35,8 @@ func NewRunnerMessageProcessor(ctx context.Context, runnerManager RunnerManagerI runnerScaleSetName: runnerScaleSet.Name, upstreamCanceledJobs: map[string]bool{}, upstreamCanceledJobsMutex: sync.RWMutex{}, - jobContextCancels: map[string]context.CancelFunc{}, - jobContextCancelsMutex: sync.Mutex{}, + runnerContextCancels: map[string]context.CancelFunc{}, + runnerContextCancelsMutex: sync.Mutex{}, vmTracker: vmTracker, } } @@ -125,33 +125,31 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale jobId = defaultJobId } - jobContext, cancel := context.WithCancel(p.ctx) - p.storeJobContextCancel(jobId, cancel) - go func() { var executionErr error defer p.removeUpstreamCanceledJob(jobId) - executor, commands, provisioningErr := p.provisionRunnerWithRetry(jobContext, jobId) + executor, commands, provisioningErr := p.provisionRunnerWithRetry(p.ctx, jobId) if provisioningErr != nil { if errors.Is(provisioningErr, context.Canceled) { p.logger.Infof("provisioning canceled for %s", p.runnerScaleSetName) } else { p.logger.Errorf("unable to provision Orka runner for %s: %v", p.runnerScaleSetName, provisioningErr) } - p.cancelJobContext(jobId, "provisioning failed") return } if executor == nil { p.logger.Errorf("provisioning returned nil executor for %s", p.runnerScaleSetName) - p.cancelJobContext(jobId, "provisioning failed") return } - context.AfterFunc(jobContext, func() { - p.logger.Infof("cleaning up resources for %s after job context is canceled", executor.VMName) + runnerContext, cancel := context.WithCancel(p.ctx) + p.storeRunnerContextCancel(executor.VMName, cancel) + + context.AfterFunc(runnerContext, func() { + p.logger.Infof("cleaning up resources for %s after runner context is canceled", executor.VMName) p.runnerProvisioner.CleanupResources(context.WithoutCancel(p.ctx), executor.VMName) p.vmTracker.Untrack(executor.VMName) }) @@ -166,26 +164,26 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale var exitErr *ssh.ExitError if errors.Is(executionErr, context.Canceled) { - cancelReason = "job context was canceled" - p.logger.Infof("job context canceled for JobId %s. Cleaning up resources.", jobId) + cancelReason = "runner context was canceled" + p.logger.Infof("runner context canceled for RunnerName %s with JobId %s. Cleaning up resources.", executor.VMName, jobId) } else if executionErr != nil { if errors.As(executionErr, &exitErr) { cancelReason = fmt.Sprintf("execution failed with exit code %d", exitErr.ExitStatus()) - p.logger.Errorf("execution failed with exit code %d for JobId %s. Cleaning up resources.", exitErr.ExitStatus(), jobId) + p.logger.Errorf("execution failed with exit code %d for RunnerName %s with JobId %s. Cleaning up resources.", exitErr.ExitStatus(), executor.VMName, jobId) } else { cancelReason = fmt.Sprintf("execution failed: %v", executionErr) - p.logger.Errorf("execution failed for JobId %s. Cleaning up resources: %v", jobId, executionErr) + p.logger.Errorf("execution failed for RunnerName %s with JobId %s. Cleaning up resources: %v", executor.VMName, jobId, executionErr) } } else { cancelReason = "execution completed successfully" - p.logger.Infof("execution completed successfully for JobId %s. Cleaning up resources.", jobId) + p.logger.Infof("execution completed successfully for RunnerName %s with JobId %s. Cleaning up resources.", executor.VMName, jobId) } - p.cancelJobContext(jobId, cancelReason) + p.cancelRunnerContext(executor.VMName, cancelReason) }() p.vmTracker.Track(executor.VMName) - executionErr = p.executeJobCommands(jobContext, jobId, executor, commands) + executionErr = p.executeJobCommands(runnerContext, jobId, executor, commands) }() } case "JobStarted": @@ -193,7 +191,7 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale if err := json.Unmarshal(message, &jobStarted); err != nil { return fmt.Errorf("could not decode job started message. %w", err) } - p.logger.Infof("Job started message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d", jobStarted.JobId, jobStarted.RunnerRequestId, jobStarted.RunnerId) + p.logger.Infof("Job started message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s", jobStarted.JobId, jobStarted.RunnerRequestId, jobStarted.RunnerId, jobStarted.RunnerName) case "JobCompleted": var jobCompleted types.JobCompleted if err := json.Unmarshal(message, &jobCompleted); err != nil { @@ -202,7 +200,12 @@ func (p *RunnerMessageProcessor) processRunnerMessage(message *types.RunnerScale p.logger.Infof("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, RunnerName: %s, with Result: %s", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId, jobCompleted.RunnerName, jobCompleted.Result) - p.cancelJobContext(jobCompleted.JobId, "Job completed webhook received") + runnerName := jobCompleted.RunnerName + if runnerName != "" { + p.cancelRunnerContext(runnerName, "Job completed webhook received") + } else { + p.logger.Warnf("Job completed message received for JobId: %s, RunnerRequestId: %d, RunnerId: %d, but RunnerName is empty. Skipping cleanup.", jobCompleted.JobId, jobCompleted.RunnerRequestId, jobCompleted.RunnerId) + } if jobCompleted.JobId != "" && (jobCompleted.Result == cancelledStatus || jobCompleted.Result == ignoredStatus || jobCompleted.Result == abandonedStatus) { p.setUpstreamCanceledJob(jobCompleted.JobId) @@ -284,22 +287,22 @@ func (p *RunnerMessageProcessor) removeUpstreamCanceledJob(jobId string) { delete(p.upstreamCanceledJobs, jobId) } -func (p *RunnerMessageProcessor) storeJobContextCancel(jobId string, cancel context.CancelFunc) { - p.jobContextCancelsMutex.Lock() - defer p.jobContextCancelsMutex.Unlock() - p.jobContextCancels[jobId] = cancel +func (p *RunnerMessageProcessor) storeRunnerContextCancel(runnerName string, cancel context.CancelFunc) { + p.runnerContextCancelsMutex.Lock() + defer p.runnerContextCancelsMutex.Unlock() + p.runnerContextCancels[runnerName] = cancel } -func (p *RunnerMessageProcessor) cancelJobContext(jobId string, reason string) { - p.jobContextCancelsMutex.Lock() - defer p.jobContextCancelsMutex.Unlock() +func (p *RunnerMessageProcessor) cancelRunnerContext(runnerName string, reason string) { + p.runnerContextCancelsMutex.Lock() + defer p.runnerContextCancelsMutex.Unlock() - if cancel, exists := p.jobContextCancels[jobId]; exists { - p.logger.Infof("canceling job context for JobId: %s. Triggered by: %s", jobId, reason) + if cancel, exists := p.runnerContextCancels[runnerName]; exists { + p.logger.Infof("canceling runner context for RunnerName: %s. Triggered by: %s", runnerName, reason) cancel() - delete(p.jobContextCancels, jobId) + delete(p.runnerContextCancels, runnerName) } else { - p.logger.Debugf("job context for JobId: %s already canceled or not found. Triggered by: %s", jobId, reason) + p.logger.Debugf("runner context for RunnerName: %s already canceled or not found. Triggered by: %s", runnerName, reason) } } diff --git a/pkg/github/runners/types.go b/pkg/github/runners/types.go index 75fe775..1c2fe7b 100644 --- a/pkg/github/runners/types.go +++ b/pkg/github/runners/types.go @@ -45,6 +45,6 @@ type RunnerMessageProcessor struct { runnerScaleSetName string upstreamCanceledJobs map[string]bool upstreamCanceledJobsMutex sync.RWMutex - jobContextCancels map[string]context.CancelFunc - jobContextCancelsMutex sync.Mutex + runnerContextCancels map[string]context.CancelFunc + runnerContextCancelsMutex sync.Mutex }