Skip to content

Commit 0b357a5

Browse files
Merge pull request #19 from alphauslabs/cancel_running_test
feat: add cancellation of test when a pr is closed
2 parents f3b79de + 76f2b9c commit 0b357a5

1 file changed

Lines changed: 89 additions & 7 deletions

File tree

main.go

Lines changed: 89 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,45 @@ func distributeSQS(app *appctx, runID string, tagFilters []string, metadata map[
383383
return true
384384
}
385385

386+
type cancelledEntry struct {
387+
expiresAt time.Time
388+
}
389+
386390
type appctx struct {
387-
pub *lspubsub.PubsubPublisher // starter publisher topic
388-
rpub *lspubsub.PubsubPublisher // topic to publish reports
389-
mtx *sync.Mutex
390-
topicArn *string
391+
pub *lspubsub.PubsubPublisher // starter publisher topic
392+
rpub *lspubsub.PubsubPublisher // topic to publish reports
393+
mtx *sync.Mutex
394+
topicArn *string
395+
cancelledMtx sync.RWMutex
396+
cancelledRuns map[string]cancelledEntry // run_ids that have been cancelled, with expiry
397+
}
398+
399+
const cancelledRunTTL = 10 * time.Minute
400+
401+
func (a *appctx) cancelRun(runID string) {
402+
a.cancelledMtx.Lock()
403+
defer a.cancelledMtx.Unlock()
404+
a.cancelledRuns[runID] = cancelledEntry{expiresAt: time.Now().Add(cancelledRunTTL)}
405+
log.Printf("cancelRun: run_id=%s will expire at %s", runID, a.cancelledRuns[runID].expiresAt.Format(time.RFC3339))
406+
}
407+
408+
func (a *appctx) isRunCancelled(runID string) bool {
409+
if runID == "" {
410+
return false
411+
}
412+
a.cancelledMtx.RLock()
413+
entry, ok := a.cancelledRuns[runID]
414+
a.cancelledMtx.RUnlock()
415+
if !ok {
416+
return false
417+
}
418+
if time.Now().After(entry.expiresAt) {
419+
a.cancelledMtx.Lock()
420+
delete(a.cancelledRuns, runID)
421+
a.cancelledMtx.Unlock()
422+
return false
423+
}
424+
return true
391425
}
392426

393427
// Our message processing callback.
@@ -484,6 +518,10 @@ func process(ctx any, data []byte) error {
484518
}
485519
case "process":
486520
log.Printf("process: %+v", c)
521+
if app.isRunCancelled(c.ID) {
522+
log.Printf("process: run_id=%s is cancelled, skipping scenario %s", c.ID, c.Scenario)
523+
return nil
524+
}
487525
doScenario(&doScenarioInput{
488526
app: app,
489527
ScenarioFiles: []string{c.Scenario},
@@ -558,7 +596,50 @@ func handleScenarioCompletion(ctx any, data []byte) error {
558596
}
559597
}
560598

561-
case "completed":
599+
case "cancelled":
600+
log.Printf("run cancelled: run_id=%s repo=%s sha=%s pr=%s",
601+
msg.RunID, msg.Repository, msg.CommitSHA, msg.PRNumber)
602+
if app, ok := ctx.(*appctx); ok && app != nil && msg.RunID != "" {
603+
app.cancelRun(msg.RunID)
604+
log.Printf("cancelled: run_id=%s marked as cancelled in-process, pending scenarios will be skipped", msg.RunID)
605+
}
606+
607+
if msg.CommitSHA == "" || msg.Repository == "" {
608+
log.Printf("cancelled: missing commit_sha or repository, skipping github status update")
609+
return nil
610+
}
611+
612+
if err := postCommitStatus(
613+
githubtoken,
614+
msg.CommitSHA,
615+
msg.Repository,
616+
msg.RunURL,
617+
"failure",
618+
fmt.Sprintf("Test run cancelled — PR #%s was closed", msg.PRNumber),
619+
); err != nil {
620+
log.Printf("postCommitStatus (cancelled) failed: %v", err)
621+
}
622+
if repslack != "" {
623+
payload := SlackMessage{
624+
Attachments: []SlackAttachment{
625+
{
626+
Color: "warning",
627+
Title: "Test Run Cancelled",
628+
Text: fmt.Sprintf("*PR #%s* in `%s` was closed.\nIn-progress test run `%s` has been cancelled.\n<%s|View workflow>",
629+
msg.PRNumber, msg.Repository, msg.RunID, msg.RunURL),
630+
Footer: fmt.Sprintf("oops • pr: %s • sha: %.7s", msg.PRNumber, msg.CommitSHA),
631+
Timestamp: time.Now().Unix(),
632+
MrkdwnIn: []string{"text"},
633+
},
634+
},
635+
}
636+
637+
if err := payload.Notify(repslack); err != nil {
638+
log.Printf("Notify (slack) cancelled failed: %v", err)
639+
}
640+
}
641+
642+
case "completed":
562643
log.Printf("run completed: run_id=%s overall_status=%s failed=%d repo=%s sha=%s",
563644
msg.RunID, msg.OverallStatus, msg.FailedCount, msg.Repository, msg.CommitSHA)
564645

@@ -657,7 +738,8 @@ func run(ctx context.Context, done chan error) {
657738
}
658739

659740
app := &appctx{
660-
mtx: &sync.Mutex{},
741+
mtx: &sync.Mutex{},
742+
cancelledRuns: make(map[string]cancelledEntry),
661743
}
662744
ctx0, cancelCtx0 := context.WithCancel(ctx)
663745
defer cancelCtx0()
@@ -754,7 +836,7 @@ func run(ctx context.Context, done chan error) {
754836

755837
done1 := make(chan error, 1)
756838
go func() {
757-
ls := lspubsub.NewLengthySubscriber(nil, project, scenariopubsub, handleScenarioCompletion)
839+
ls := lspubsub.NewLengthySubscriber(app, project, scenariopubsub, handleScenarioCompletion)
758840
err := ls.Start(ctx0, done1)
759841
if err != nil {
760842
log.Fatalf("listener for scenario progress failed: %v", err)

0 commit comments

Comments
 (0)