From 851b660440d1d4fb63233f0645b540be4543cbf5 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Mon, 15 Jun 2026 20:02:50 +0300 Subject: [PATCH 1/2] Don't orphan the runner on shutdown On shutdown the daemon returned as soon as the done channel was closed, but the goroutine that terminates the runner (keepRunnerAlive) was still running its cleanup. The process could exit before the runner's process group was killed, leaving the runner behind. The service now tracks its goroutines with a WaitGroup and exposes a Wait method. keepRunnerAlive signals the group only after its cleanup defer has run, so both the Unix daemon and the Windows service handler now wait for the runner to be torn down before they return. Signed-off-by: Gabriel Adrian Samfira --- cmd/daemon_nix.go | 3 +++ service/service.go | 18 ++++++++++++++++++ service/service_windows.go | 7 ++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/cmd/daemon_nix.go b/cmd/daemon_nix.go index 84a53d5..34e8091 100644 --- a/cmd/daemon_nix.go +++ b/cmd/daemon_nix.go @@ -17,5 +17,8 @@ func runService(service *service.Service) error { } <-service.Done() + // Wait for the service goroutines to finish so the runner's process group is + // torn down before we exit; otherwise the runner is orphaned on shutdown. + service.Wait() return nil } diff --git a/service/service.go b/service/service.go index 22d5052..31d2ec8 100644 --- a/service/service.go +++ b/service/service.go @@ -67,6 +67,11 @@ type Service struct { cliMux sync.Mutex running bool done chan struct{} + // wg tracks the keepAliveLoop, loop and keepRunnerAlive goroutines so that + // shutdown can wait for them to finish — in particular the runner cleanup in + // keepRunnerAlive that terminates the runner's process group — before the + // process exits and orphans the runner. + wg sync.WaitGroup connecting chan struct{} connected chan struct{} @@ -78,6 +83,15 @@ func (s *Service) Done() chan struct{} { return s.done } +// Wait blocks until the service's goroutines have exited. This includes the +// runner cleanup in keepRunnerAlive that terminates the runner's process group, +// so callers should Wait after Done() fires to ensure the runner is torn down +// before the process exits. The goroutines all return on ctx cancellation or +// Stop(), so Wait does not block indefinitely. +func (s *Service) Wait() { + s.wg.Wait() +} + func (s *Service) getClient() (*garmWs.Reader, error) { s.cliMux.Lock() cli := s.cli @@ -301,6 +315,7 @@ func (s *Service) Start() error { s.running = true s.done = make(chan struct{}) + s.wg.Add(3) go s.keepAliveLoop() go s.loop() go s.keepRunnerAlive() @@ -503,6 +518,7 @@ func (s *Service) sleepWithCancel(d time.Duration) (shouldQuit bool) { } func (s *Service) keepRunnerAlive() { + defer s.wg.Done() retryCreate: state := s.determineRunnerState(s.isRunnerAlive()) if state == params.RunnerTerminated { @@ -585,6 +601,7 @@ retryStart: } func (s *Service) keepAliveLoop() { + defer s.wg.Done() var sleepTime time.Duration retryConnecting: if sleepTime > 0 { @@ -629,6 +646,7 @@ retryConnecting: } func (s *Service) loop() { + defer s.wg.Done() heartbeatTicker := time.NewTicker(30 * time.Second) defer func() { slog.InfoContext(s.ctx, "daemon is shutting down") diff --git a/service/service_windows.go b/service/service_windows.go index c2f170f..d6c4bc0 100644 --- a/service/service_windows.go +++ b/service/service_windows.go @@ -10,7 +10,6 @@ func (s *Service) Execute(args []string, r <-chan svc.ChangeRequest, status chan if err := s.Start(); err != nil { return false, 11 } - defer s.Stop() const cmdsAccepted = svc.AcceptStop | svc.AcceptShutdown status <- svc.Status{State: svc.StartPending} @@ -39,5 +38,11 @@ loop: } status <- svc.Status{State: svc.StopPending} + if stopErr := s.Stop(); stopErr != nil { + slog.ErrorContext(s.ctx, "failed to stop service", "error", stopErr) + } + // Wait for the service goroutines (including the runner cleanup) to finish + // before reporting the service stopped. + s.Wait() return false, 0 } From 952f439fba0bd2816aa08897bff8c18dbd842f88 Mon Sep 17 00:00:00 2001 From: Gabriel Adrian Samfira Date: Tue, 16 Jun 2026 09:04:19 +0300 Subject: [PATCH 2/2] Fix process cleanup and service data races The service read runnerAlive in determineRunnerState, the cli field in loop and Stop, and the connecting/connected channel fields from multiple goroutines without consistent locking. This could cause stale reads, torn pointer values, and a nil dereference if the websocket connected before the runner command was created. runnerAlive is now snapshotted under the lock and passed into determineRunnerState and the status senders. The cli field is owned exclusively by cliMux: keepAliveLoop uses its local after the write, loop reads through getClient, and Stop reads under cliMux. The connecting/connected handshake channels are guarded by a dedicated connMux with snapshot-before-select and two small helpers (connectionUp and connectionDown). loop was restructured from a goto into a nested for loop so the client snapshot does not tangle with label scoping. A reconnect test (TestServiceReconnect) drives the full connect, drop, reconnect cycle against an httptest websocket server under -race. Signed-off-by: Gabriel Adrian Samfira --- service/service.go | 102 ++++++++++++++++++++++++++++------------ service/service_test.go | 74 +++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 30 deletions(-) diff --git a/service/service.go b/service/service.go index 31d2ec8..6b355f8 100644 --- a/service/service.go +++ b/service/service.go @@ -73,6 +73,9 @@ type Service struct { // process exits and orphans the runner. wg sync.WaitGroup + // connMux guards the connecting/connected handshake channels, which are + // closed and re-created from both keepAliveLoop and loop. + connMux sync.Mutex connecting chan struct{} connected chan struct{} @@ -333,9 +336,12 @@ func (s *Service) Stop() error { close(s.done) s.running = false + + s.cliMux.Lock() if s.cli != nil { s.cli.Stop() } + s.cliMux.Unlock() return nil } @@ -600,6 +606,26 @@ retryStart: } } +// connectionUp is called by keepAliveLoop once a websocket connection is +// established: it re-arms the connected channel (so the next select blocks until +// the connection drops) and closes connecting to wake loop(). +func (s *Service) connectionUp() { + s.connMux.Lock() + defer s.connMux.Unlock() + s.connected = make(chan struct{}) + close(s.connecting) +} + +// connectionDown is called by loop() when the connection drops: it re-arms the +// connecting channel and closes connected to wake keepAliveLoop() for a +// reconnect. +func (s *Service) connectionDown() { + s.connMux.Lock() + defer s.connMux.Unlock() + s.connecting = make(chan struct{}) + close(s.connected) +} + func (s *Service) keepAliveLoop() { defer s.wg.Done() var sleepTime time.Duration @@ -610,12 +636,15 @@ retryConnecting: } } for { + s.connMux.Lock() + connected := s.connected + s.connMux.Unlock() select { case <-s.done: return case <-s.ctx.Done(): return - case <-s.connected: + case <-connected: slog.InfoContext(s.ctx, "attempting to connect to GARM server", "server", s.cfg.ServerURL) sleepTime = 5 * time.Second parsed, err := url.ParseRequestURI(s.cfg.ServerURL) @@ -634,13 +663,12 @@ retryConnecting: s.cli = cli s.cliMux.Unlock() - if err := s.cli.Start(); err != nil { + if err := cli.Start(); err != nil { slog.WarnContext(s.ctx, "failed to start websocket connection", "error", err) goto retryConnecting } slog.InfoContext(s.ctx, "successfully connected to GARM", "server", s.cfg.ServerURL) - s.connected = make(chan struct{}) - close(s.connecting) + s.connectionUp() } } } @@ -656,38 +684,52 @@ func (s *Service) loop() { heartbeatTicker.Stop() }() -connecting: - select { - case <-s.done: - return - case <-s.ctx.Done(): - return - case <-s.connecting: - } - // send initial heartbeat - if id, alive, ok := s.snapshot(); ok { - if err := s.sendHeartbeat(id); err != nil { - slog.ErrorContext(s.ctx, "failed to send heartbeat", "error", err) - } - s.sendRunnerStatus(id, alive) - } - for { + // Wait until keepAliveLoop signals that a connection is up. + s.connMux.Lock() + connectingCh := s.connecting + s.connMux.Unlock() select { case <-s.done: return case <-s.ctx.Done(): return - case <-s.cli.Done(): - slog.InfoContext(s.ctx, "remote host closed WS connection") - s.connecting = make(chan struct{}) - close(s.connected) - goto connecting - case <-heartbeatTicker.C: - // send heartbeat - if id, _, ok := s.snapshot(); ok { - if err := s.sendHeartbeat(id); err != nil { - slog.ErrorContext(s.ctx, "failed to send heartbeat", "error", err) + case <-connectingCh: + } + + cli, err := s.getClient() + if err != nil { + // Signalled connected but the client is gone; ask for a reconnect. + slog.ErrorContext(s.ctx, "no websocket client after connect", "error", err) + s.connectionDown() + continue + } + + // send initial heartbeat + if id, alive, ok := s.snapshot(); ok { + if err := s.sendHeartbeat(id); err != nil { + slog.ErrorContext(s.ctx, "failed to send heartbeat", "error", err) + } + s.sendRunnerStatus(id, alive) + } + + online := true + for online { + select { + case <-s.done: + return + case <-s.ctx.Done(): + return + case <-cli.Done(): + slog.InfoContext(s.ctx, "remote host closed WS connection") + s.connectionDown() + online = false + case <-heartbeatTicker.C: + // send heartbeat + if id, _, ok := s.snapshot(); ok { + if err := s.sendHeartbeat(id); err != nil { + slog.ErrorContext(s.ctx, "failed to send heartbeat", "error", err) + } } } } diff --git a/service/service_test.go b/service/service_test.go index 4ecb16b..ca117f0 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -2,11 +2,16 @@ package service import ( "context" + "net/http" + "net/http/httptest" "os" "path/filepath" + "strings" "testing" "time" + "github.com/gorilla/websocket" + "github.com/cloudbase/garm-agent/config" "github.com/cloudbase/garm/params" ) @@ -439,3 +444,72 @@ func indexOf(s, substr string) int { } return -1 } + +// TestServiceReconnect drives the keepAliveLoop/loop connect handshake against a +// real websocket server: it connects, the server drops the connection, and the +// agent must reconnect. Run under -race, it exercises the connecting/connected +// and cli synchronization. +func TestServiceReconnect(t *testing.T) { + upgrader := websocket.Upgrader{} + conns := make(chan *websocket.Conn, 8) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + conns <- c + for { + if _, _, err := c.ReadMessage(); err != nil { + return + } + } + })) + defer srv.Close() + + wsURL := "ws" + strings.TrimPrefix(srv.URL, "http") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s := &Service{ + ctx: ctx, + cfg: &config.Agent{ServerURL: wsURL, Token: "test-token"}, + done: make(chan struct{}), + connecting: make(chan struct{}), + connected: closed, + running: true, + } + s.wg.Add(2) + go s.keepAliveLoop() + go s.loop() + + waitConn := func(what string) *websocket.Conn { + t.Helper() + select { + case c := <-conns: + return c + case <-time.After(5 * time.Second): + cancel() + t.Fatalf("timed out waiting for the agent to %s", what) + return nil + } + } + + c1 := waitConn("connect") + // Drop the connection from the server side; the agent should reconnect. + c1.Close() + waitConn("reconnect") + + // Shut down and make sure both goroutines exit. + cancel() + stopped := make(chan struct{}) + go func() { + s.wg.Wait() + close(stopped) + }() + select { + case <-stopped: + case <-time.After(5 * time.Second): + t.Fatal("service goroutines did not exit after shutdown") + } +}