Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions cmds/dutagent/states.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type runCmdArgs struct {
cmdMsg *pb.Command
dev dut.Device
cmd dut.Command
broker *dutagent.Broker
session module.Session
moduleErrCh chan error
brokerErrCh <-chan error
Expand Down Expand Up @@ -151,8 +152,10 @@ func releaseAutoLock(_ context.Context, args runCmdArgs) (runCmdArgs, fsm.State[
// in a separate goroutine, this state will not wait for the execution to finish.
// Further, worker goroutines will be started to serve the module-to-client communication
// during the module execution.
//
//nolint:funlen
func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State[runCmdArgs], error) {
broker := &dutagent.Broker{}
args.broker = &dutagent.Broker{}

// Deferred initialization of the moduleErr channel: only create if not already provided
// (tests may still pass a custom channel).
Expand All @@ -163,7 +166,7 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State
rpcCtx := ctx
modCtx, modCtxCancel := context.WithCancel(rpcCtx)

moduleSession, brokerErrCh := broker.Start(modCtx, args.stream)
moduleSession, brokerErrCh := args.broker.Start(modCtx, args.stream)
args.brokerErrCh = brokerErrCh
args.session = moduleSession

Expand All @@ -178,12 +181,19 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State
// Run the modules in a goroutine.
// Termination of the module execution is signaled by closing the moduleErrCh channel.
go func() {
defer modCtxCancel() // Ensure workers exit even if stream doesn't close

cnt := len(args.cmd.Modules)

for idx, module := range args.cmd.Modules {
if ctx.Err() != nil {
log.Printf("Execution aborted, %d of %d modules done: %v", idx, cnt, ctx.Err())
modCtxCancel()
args.broker.Shutdown()

// Wait for file transfers to complete (workers will exit gracefully)
log.Print("Waiting for file transfers to complete...")
args.broker.WaitForTransfersToComplete()
Comment on lines +191 to +195

Copilot AI Feb 9, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executeModules blocks on WaitForTransfersToComplete() while modCtxCancel() is deferred until the goroutine returns. If the client disconnects or a transfer never completes/acks, this goroutine can block forever and the Run RPC won’t terminate. Consider adding a timeout/context to the wait, or canceling workers and force-cleaning outstanding transfers on ctx cancellation.

Copilot uses AI. Check for mistakes.
log.Print("All file transfers completed")

return
}
Expand All @@ -195,14 +205,25 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State
args.moduleErrCh <- err

log.Printf("Module %q failed: %v", module.Config.Name, err)
modCtxCancel()
args.broker.Shutdown()

// Wait for file transfers to complete (workers will exit gracefully)
log.Print("Waiting for file transfers to complete...")
args.broker.WaitForTransfersToComplete()
log.Print("All file transfers completed")

return
}
}

log.Print("All modules finished successfully")
modCtxCancel()
args.broker.Shutdown()

// Wait for file transfers to complete (workers will exit gracefully)
log.Print("Waiting for file transfers to complete...")
args.broker.WaitForTransfersToComplete()
log.Print("All file transfers completed")

close(args.moduleErrCh)
}()

Expand Down
Loading