From 533f61d8fb117eade5aaa70d10b3f12eaed0c844 Mon Sep 17 00:00:00 2001 From: garmr Date: Wed, 25 Feb 2026 17:30:47 -0800 Subject: [PATCH 1/4] feat(ipc): add SSE status event streaming and replace string status with VPNStatus type Introduce a Server-Sent Events endpoint (/status/events) that streams VPN status changes to clients in real time, replacing the previous poll-based approach. Refactor status representation from string constants (StatusRunning, StatusClosed, etc.) to a typed VPNStatus enum (Connected, Disconnected, Connecting, Disconnecting, Restarting, ErrorStatus) and move status emission from the IPC server into the tunnel layer. The tracer middleware is scoped to standard routes so it no longer buffers long-lived SSE connections, and the HTTP transport is upgraded to unencrypted HTTP/2 for multiplexed streaming support. --- telemetry/connections.go | 2 +- vpn/ipc/clash_mode.go | 2 +- vpn/ipc/connections.go | 4 +- vpn/ipc/endpoints.go | 1 + vpn/ipc/events.go | 58 +++++++++++++++++++++++++ vpn/ipc/events_client.go | 94 ++++++++++++++++++++++++++++++++++++++++ vpn/ipc/events_test.go | 88 +++++++++++++++++++++++++++++++++++++ vpn/ipc/group.go | 2 +- vpn/ipc/http.go | 10 ++++- vpn/ipc/outbound.go | 6 +-- vpn/ipc/server.go | 93 ++++++++++++++++++--------------------- vpn/ipc/status.go | 28 +++++------- vpn/service.go | 9 ++-- vpn/tunnel.go | 38 ++++++++++++---- vpn/tunnel_test.go | 6 +-- vpn/vpn.go | 2 +- vpn/vpn_test.go | 10 ++--- 17 files changed, 354 insertions(+), 99 deletions(-) create mode 100644 vpn/ipc/events.go create mode 100644 vpn/ipc/events_client.go create mode 100644 vpn/ipc/events_test.go diff --git a/telemetry/connections.go b/telemetry/connections.go index aba7106f..acc55d1e 100644 --- a/telemetry/connections.go +++ b/telemetry/connections.go @@ -48,7 +48,7 @@ func harvestConnectionMetrics(pollInterval time.Duration) func() { if err != nil { slog.Warn("failed to get service status", "error", err) } - if vpnStatus != ipc.StatusRunning { + if vpnStatus != ipc.Connected { continue } conns, err := ipc.GetConnections(ctx) diff --git a/vpn/ipc/clash_mode.go b/vpn/ipc/clash_mode.go index 8b4a7899..ec0f9e97 100644 --- a/vpn/ipc/clash_mode.go +++ b/vpn/ipc/clash_mode.go @@ -34,7 +34,7 @@ func SetClashMode(ctx context.Context, mode string) error { // clashModeHandler handles HTTP requests for getting or setting the Clash server mode. func (s *Server) clashModeHandler(w http.ResponseWriter, req *http.Request) { span := trace.SpanFromContext(req.Context()) - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } diff --git a/vpn/ipc/connections.go b/vpn/ipc/connections.go index 911f69c3..125c8017 100644 --- a/vpn/ipc/connections.go +++ b/vpn/ipc/connections.go @@ -19,7 +19,7 @@ func CloseConnections(ctx context.Context, connIDs []string) error { } func (s *Server) closeConnectionHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } @@ -54,7 +54,7 @@ func GetConnections(ctx context.Context) ([]Connection, error) { } func (s *Server) connectionsHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } diff --git a/vpn/ipc/endpoints.go b/vpn/ipc/endpoints.go index 7aaad2a5..4a83aaf1 100644 --- a/vpn/ipc/endpoints.go +++ b/vpn/ipc/endpoints.go @@ -13,4 +13,5 @@ const ( connectionsEndpoint = "/connections" closeConnectionsEndpoint = "/connections/close" setSettingsPathEndpoint = "/set" + statusEventsEndpoint = "/status/events" ) diff --git a/vpn/ipc/events.go b/vpn/ipc/events.go new file mode 100644 index 00000000..f9ca27ad --- /dev/null +++ b/vpn/ipc/events.go @@ -0,0 +1,58 @@ +package ipc + +import ( + "encoding/json" + "fmt" + "log/slog" + "net/http" + + "github.com/getlantern/radiance/events" +) + +// StatusUpdateEvent is emitted when the VPN status changes. +type StatusUpdateEvent struct { + events.Event + Status VPNStatus `json:"status"` + Error string `json:"error,omitempty"` +} + +func (s *Server) statusEventsHandler(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + ch := make(chan StatusUpdateEvent, 8) + + // Send the current status immediately so the client doesn't have to wait for a change. + ch <- StatusUpdateEvent{Status: s.service.Status()} + + sub := events.Subscribe(func(evt StatusUpdateEvent) { + select { + case ch <- evt: + default: // drop if client is slow + } + }) + defer sub.Unsubscribe() + + for { + select { + case evt := <-ch: + buf, err := json.Marshal(evt) + if err != nil { + slog.Error("failed to marshal event", "error", err) + continue + } + fmt.Fprintf(w, "%s\r\n", buf) + flusher.Flush() + case <-r.Context().Done(): + slog.Debug("client disconnected") + return + } + } +} diff --git a/vpn/ipc/events_client.go b/vpn/ipc/events_client.go new file mode 100644 index 00000000..50b7666d --- /dev/null +++ b/vpn/ipc/events_client.go @@ -0,0 +1,94 @@ +package ipc + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/getlantern/radiance/events" +) + +// StartStatusStream starts streaming status updates from the server and emits received +// [StatusUpdateEvent] events until the context is cancelled. If waitForConnect is true, it +// polls in a background goroutine until the server is reachable. When the stream is lost +// (server restart, network error, clean EOF), a [StatusUpdateEvent] with [Disconnected] status +// is emitted. The retry loop continues until the context is cancelled or a non-recoverable error +// occurs (e.g. connection refused, invalid response). +func StartStatusStream(ctx context.Context, waitForConnect bool) error { + if !waitForConnect { + return startStream(ctx) + } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(1 * time.Second): + serverListening, err := tryDial(ctx) + if err != nil { + events.Emit(StatusUpdateEvent{ + Status: ErrorStatus, + Error: fmt.Sprintf("connection error: %v", err), + }) + return + } + if !serverListening { + continue // we started trying to connect before the server is ready + } + err = startStream(ctx) + if ctx.Err() != nil { + return + } + evt := StatusUpdateEvent{Status: Disconnected} + if err != nil { + slog.Warn("status stream disconnected", "error", err) + evt.Error = fmt.Sprintf("stream disconnected: %v", err) + } + // Stream ended cleanly (EOF) — server likely shut down. + events.Emit(evt) + return + } + } + }() + return nil +} + +func startStream(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "GET", apiURL+statusEventsEndpoint, nil) + if err != nil { + return fmt.Errorf("creating request: %w", err) + } + client := &http.Client{ + Transport: &http.Transport{ + DialContext: dialContext, + Protocols: protocols, + }, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("connecting: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status %s", resp.Status) + } + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + continue + } + var evt StatusUpdateEvent + if err := json.Unmarshal([]byte(line), &evt); err != nil { + continue + } + events.Emit(evt) + } + return scanner.Err() +} diff --git a/vpn/ipc/events_test.go b/vpn/ipc/events_test.go new file mode 100644 index 00000000..4a7a6102 --- /dev/null +++ b/vpn/ipc/events_test.go @@ -0,0 +1,88 @@ +package ipc + +import ( + "bytes" + "context" + "encoding/json" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/sagernet/sing-box/experimental/clashapi" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/getlantern/radiance/events" +) + +func TestStatusEventsHandler(t *testing.T) { + svc := newMockService() + s := &Server{service: svc} + + rec := httptest.NewRecorder() + req := httptest.NewRequest("GET", statusEventsEndpoint, nil) + + done := make(chan struct{}) + go func() { + defer close(done) + s.statusEventsHandler(rec, req) + }() + + waitAssert := func(want StatusUpdateEvent, msg string) { + require.Eventually(t, func() bool { + return strings.Contains(rec.Body.String(), "\r\n") + }, time.Second, 10*time.Millisecond, msg) + evt := parseEventLine(t, rec.Body) + rec.Body.Reset() + assert.Equal(t, want, evt, msg) + } + waitAssert(StatusUpdateEvent{Status: Disconnected}, "initial event not received") + + // Emit a status change and wait for it to arrive. + evt := StatusUpdateEvent{Status: Connected} + events.Emit(evt) + waitAssert(evt, "connected event not received") + + // Emit an error status + evt = StatusUpdateEvent{Status: ErrorStatus, Error: "something went wrong"} + events.Emit(evt) + waitAssert(evt, "error event not received") + + // Cancel the service context — handler should return. + svc.Close() + select { + case <-done: + case <-time.After(time.Second): + require.Fail(t, "handler did not return after service context cancellation") + } +} + +func parseEventLine(t *testing.T, body *bytes.Buffer) StatusUpdateEvent { + line, err := body.ReadBytes('\n') + require.NoError(t, err) + + var evt StatusUpdateEvent + line = bytes.TrimSpace(line) + require.NoError(t, json.Unmarshal(line, &evt)) + return evt +} + +type mockService struct { + ctx context.Context + cancel context.CancelFunc + status VPNStatus +} + +func newMockService() *mockService { + ctx, cancel := context.WithCancel(context.Background()) + return &mockService{ctx: ctx, cancel: cancel, status: Disconnected} +} + +func (m *mockService) Ctx() context.Context { return m.ctx } +func (m *mockService) Status() VPNStatus { return m.status } +func (m *mockService) Start(context.Context, string, string) error { return nil } +func (m *mockService) Restart(context.Context) error { return nil } +func (m *mockService) ClashServer() *clashapi.Server { return nil } +func (m *mockService) Close() error { m.cancel(); return nil } diff --git a/vpn/ipc/group.go b/vpn/ipc/group.go index 7f0d9e52..48ede66a 100644 --- a/vpn/ipc/group.go +++ b/vpn/ipc/group.go @@ -16,7 +16,7 @@ func GetGroups(ctx context.Context) ([]OutboundGroup, error) { } func (s *Server) groupHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } diff --git a/vpn/ipc/http.go b/vpn/ipc/http.go index f2af307d..3167a559 100644 --- a/vpn/ipc/http.go +++ b/vpn/ipc/http.go @@ -19,6 +19,12 @@ import ( const tracerName = "github.com/getlantern/radiance/vpn/ipc" +var protocols = func() *http.Protocols { + p := &http.Protocols{} + p.SetUnencryptedHTTP2(true) + return p +}() + // empty is a placeholder type for requests that do not expect a response body. type empty struct{} @@ -40,7 +46,9 @@ func sendRequest[T any](ctx context.Context, method, endpoint string, data any) } client := &http.Client{ Transport: &http.Transport{ - DialContext: dialContext, + DialContext: dialContext, + Protocols: protocols, + ForceAttemptHTTP2: true, }, } resp, err := client.Do(req) diff --git a/vpn/ipc/outbound.go b/vpn/ipc/outbound.go index 715a6c8f..fc8a2817 100644 --- a/vpn/ipc/outbound.go +++ b/vpn/ipc/outbound.go @@ -29,7 +29,7 @@ func SelectOutbound(ctx context.Context, groupTag, outboundTag string) error { } func (s *Server) selectHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } @@ -89,7 +89,7 @@ func GetSelected(ctx context.Context) (group, tag string, err error) { } func (s *Server) selectedHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } @@ -122,7 +122,7 @@ func GetActiveOutbound(ctx context.Context) (group, tag string, err error) { } func (s *Server) activeOutboundHandler(w http.ResponseWriter, r *http.Request) { - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusServiceUnavailable) return } diff --git a/vpn/ipc/server.go b/vpn/ipc/server.go index eb7e8c33..f88ea066 100644 --- a/vpn/ipc/server.go +++ b/vpn/ipc/server.go @@ -1,4 +1,5 @@ // Package ipc implements the IPC server for communicating between the client and the VPN service. + // It provides HTTP endpoints for retrieving statistics, managing groups, selecting outbounds, // changing modes, and closing connections. package ipc @@ -19,7 +20,6 @@ import ( "go.opentelemetry.io/otel" "github.com/getlantern/radiance/common" - "github.com/getlantern/radiance/events" ) var ( @@ -30,7 +30,7 @@ var ( // Service defines the interface that the IPC server uses to interact with the underlying VPN service. type Service interface { Ctx() context.Context - Status() string + Status() VPNStatus Start(ctx context.Context, group, tag string) error Restart(ctx context.Context) error ClashServer() *clashapi.Server @@ -40,28 +40,21 @@ type Service interface { // Server represents the IPC server that communicates over a Unix domain socket for Unix-like // systems, and a named pipe for Windows. type Server struct { - svr *http.Server - service Service - router chi.Router - vpnStatus atomic.Value // string - closed atomic.Bool -} - -// StatusUpdateEvent is emitted when the VPN status changes. -type StatusUpdateEvent struct { - events.Event - Status VPNStatus - Error error + svr *http.Server + service Service + router chi.Router + closed atomic.Bool } type VPNStatus string // Possible VPN statuses const ( - Connected VPNStatus = "connected" - Disconnected VPNStatus = "disconnected" Connecting VPNStatus = "connecting" + Connected VPNStatus = "connected" Disconnecting VPNStatus = "disconnecting" + Disconnected VPNStatus = "disconnected" + Restarting VPNStatus = "restarting" ErrorStatus VPNStatus = "error" ) @@ -75,8 +68,7 @@ func NewServer(service Service) *Server { service: service, router: chi.NewMux(), } - s.vpnStatus.Store(Disconnected) - s.router.Use(log, tracer) + s.router.Use(log) // Only add auth middleware if not running on mobile, since mobile platforms have their own // sandboxing and permission models. @@ -85,28 +77,38 @@ func NewServer(service Service) *Server { s.router.Use(authPeer) } - s.router.Get("/", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) + // Standard routes use the tracer middleware which buffers response bodies for error recording. + s.router.Group(func(r chi.Router) { + r.Use(tracer) + r.Get("/", func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + }) + r.Get(statusEndpoint, s.statusHandler) + r.Get(metricsEndpoint, s.metricsHandler) + r.Get(groupsEndpoint, s.groupHandler) + r.Get(connectionsEndpoint, s.connectionsHandler) + r.Get(selectEndpoint, s.selectedHandler) + r.Get(activeEndpoint, s.activeOutboundHandler) + r.Post(selectEndpoint, s.selectHandler) + r.Get(clashModeEndpoint, s.clashModeHandler) + r.Post(clashModeEndpoint, s.clashModeHandler) + r.Post(startServiceEndpoint, s.startServiceHandler) + r.Post(stopServiceEndpoint, s.stopServiceHandler) + r.Post(restartServiceEndpoint, s.restartServiceHandler) + r.Post(closeConnectionsEndpoint, s.closeConnectionHandler) + r.Post(setSettingsPathEndpoint, s.setSettingsPathHandler) }) - s.router.Get(statusEndpoint, s.statusHandler) - s.router.Get(metricsEndpoint, s.metricsHandler) - s.router.Get(groupsEndpoint, s.groupHandler) - s.router.Get(connectionsEndpoint, s.connectionsHandler) - s.router.Get(selectEndpoint, s.selectedHandler) - s.router.Get(activeEndpoint, s.activeOutboundHandler) - s.router.Post(selectEndpoint, s.selectHandler) - s.router.Get(clashModeEndpoint, s.clashModeHandler) - s.router.Post(clashModeEndpoint, s.clashModeHandler) - s.router.Post(startServiceEndpoint, s.startServiceHandler) - s.router.Post(stopServiceEndpoint, s.stopServiceHandler) - s.router.Post(restartServiceEndpoint, s.restartServiceHandler) - s.router.Post(closeConnectionsEndpoint, s.closeConnectionHandler) - s.router.Post(setSettingsPathEndpoint, s.setSettingsPathHandler) + + // SSE routes skip the tracer middleware since it buffers the entire response body + // and holds the span open for the lifetime of the connection. + s.router.Get(statusEventsEndpoint, s.statusEventsHandler) svr := &http.Server{ - Handler: s.router, - ReadTimeout: time.Second * 5, - WriteTimeout: time.Second * 5, + Handler: s.router, + ReadTimeout: time.Second * 5, + // WriteTimeout is 0 (unlimited) to support long-lived SSE connections. + // Non-streaming handlers return quickly so this is safe. + Protocols: protocols, } if addAuth { svr.ConnContext = func(ctx context.Context, c net.Conn) context.Context { @@ -137,10 +139,9 @@ func (s *Server) Start() error { slog.Error("IPC server", "error", err) } s.closed.Store(true) - if s.service.Status() != StatusClosed { + if s.service.Status() != Disconnected { slog.Warn("IPC server stopped unexpectedly, closing service") s.service.Close() - s.setVPNStatus(ErrorStatus, errors.New("IPC server stopped unexpectedly")) } }() @@ -178,7 +179,7 @@ func (s *Server) startServiceHandler(w http.ResponseWriter, r *http.Request) { ctx, span := otel.Tracer(tracerName).Start(r.Context(), "ipc.Server.StartService") defer span.End() // check if service is already running - if s.service.Status() != StatusClosed { + if s.service.Status() != Disconnected { w.WriteHeader(http.StatusOK) return } @@ -189,17 +190,14 @@ func (s *Server) startServiceHandler(w http.ResponseWriter, r *http.Request) { } if err := s.service.Start(ctx, p.GroupTag, p.OutboundTag); err != nil { - s.setVPNStatus(ErrorStatus, err) http.Error(w, err.Error(), http.StatusServiceUnavailable) return } - s.setVPNStatus(Connected, nil) w.WriteHeader(http.StatusOK) } func (s *Server) stopServiceHandler(w http.ResponseWriter, r *http.Request) { slog.Debug("Received request to stop service via IPC") - defer s.setVPNStatus(Disconnected, nil) if err := s.service.Close(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -216,20 +214,13 @@ func (s *Server) restartServiceHandler(w http.ResponseWriter, r *http.Request) { ctx, span := otel.Tracer(tracerName).Start(r.Context(), "ipc.Server.restartServiceHandler") defer span.End() - if s.service.Status() != StatusRunning { + if s.service.Status() != Connected { http.Error(w, ErrServiceIsNotReady.Error(), http.StatusInternalServerError) return } - s.vpnStatus.Store(Disconnected) if err := s.service.Restart(ctx); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - s.setVPNStatus(Connected, nil) w.WriteHeader(http.StatusOK) } - -func (s *Server) setVPNStatus(status VPNStatus, err error) { - s.vpnStatus.Store(status) - events.Emit(StatusUpdateEvent{Status: status, Error: err}) -} diff --git a/vpn/ipc/status.go b/vpn/ipc/status.go index fa7b8e1c..82b6c71e 100644 --- a/vpn/ipc/status.go +++ b/vpn/ipc/status.go @@ -16,14 +16,6 @@ import ( "go.opentelemetry.io/otel/trace" ) -const ( - StatusInitializing = "initializing" - StatusConnecting = "connecting" - StatusRunning = "running" - StatusClosing = "closing" - StatusClosed = "closed" -) - // Metrics represents the runtime metrics of the service. type Metrics struct { Memory uint64 @@ -51,7 +43,7 @@ func (s *Server) metricsHandler(w http.ResponseWriter, r *http.Request) { Goroutines: runtime.NumGoroutine(), Connections: conntrack.Count(), } - if s.service.Status() == StatusRunning { + if s.service.Status() == Connected { up, down := s.service.ClashServer().TrafficManager().Total() stats.UplinkTotal, stats.DownlinkTotal = up, down } @@ -62,25 +54,25 @@ func (s *Server) metricsHandler(w http.ResponseWriter, r *http.Request) { } } -type state struct { - State string `json:"state"` +type vpnStatus struct { + Status VPNStatus `json:"status"` } // GetStatus retrieves the current status of the service. -func GetStatus(ctx context.Context) (string, error) { +func GetStatus(ctx context.Context) (VPNStatus, error) { // try to dial first to check if IPC server is even running and avoid waiting for timeout if canDial, err := tryDial(ctx); !canDial { - return StatusClosed, err + return Disconnected, err } - res, err := sendRequest[state](ctx, "GET", statusEndpoint, nil) + res, err := sendRequest[vpnStatus](ctx, "GET", statusEndpoint, nil) if errors.Is(err, ErrIPCNotRunning) || errors.Is(err, ErrServiceIsNotReady) { - return StatusClosed, nil + return Disconnected, nil } if err != nil { return "", fmt.Errorf("error getting status: %w", err) } - return res.State, nil + return res.Status, nil } func tryDial(ctx context.Context) (bool, error) { @@ -98,9 +90,9 @@ func tryDial(ctx context.Context) (bool, error) { func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) { span := trace.SpanFromContext(r.Context()) status := s.service.Status() - span.SetAttributes(attribute.String("status", status)) + span.SetAttributes(attribute.String("status", status.String())) w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(state{status}); err != nil { + if err := json.NewEncoder(w).Encode(status); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/vpn/service.go b/vpn/service.go index cfb5cec6..16e69a98 100644 --- a/vpn/service.go +++ b/vpn/service.go @@ -137,12 +137,13 @@ func (s *TunnelService) Restart(ctx context.Context) error { s.mu.Unlock() return errors.New("tunnel not started") } - if s.tunnel.Status() != ipc.StatusRunning { + if s.tunnel.Status() != ipc.Connected { s.mu.Unlock() return errors.New("tunnel not running") } s.logger.Info("Restarting tunnel") + s.tunnel.setStatus(ipc.Restarting, nil) if s.platformIfce != nil { s.mu.Unlock() if err := s.platformIfce.RestartService(); err != nil { @@ -165,13 +166,13 @@ func (s *TunnelService) Restart(ctx context.Context) error { } // Status returns the current status of the tunnel (e.g., running, closed). -func (s *TunnelService) Status() string { +func (s *TunnelService) Status() ipc.VPNStatus { s.mu.Lock() defer s.mu.Unlock() if s.tunnel == nil { - return ipc.StatusClosed + return ipc.Disconnected } - return s.tunnel.Status() + return ipc.VPNStatus(s.tunnel.Status()) } // Ctx returns the context associated with the tunnel, or nil if no tunnel is running. diff --git a/vpn/tunnel.go b/vpn/tunnel.go index 4c650eec..f288d3b7 100644 --- a/vpn/tunnel.go +++ b/vpn/tunnel.go @@ -24,6 +24,7 @@ import ( "github.com/getlantern/radiance/common" "github.com/getlantern/radiance/common/settings" + "github.com/getlantern/radiance/events" "github.com/getlantern/radiance/internal" "github.com/getlantern/radiance/servers" "github.com/getlantern/radiance/vpn/ipc" @@ -55,14 +56,21 @@ type tunnel struct { clientContextTracker *clientcontext.ClientContextInjector - status atomic.Value + status atomic.Value // ipc.VPNStatus cancel context.CancelFunc closers []io.Closer } -func (t *tunnel) start(opts O.Options, platformIfce libbox.PlatformInterface) error { - t.status.Store(ipc.StatusInitializing) +func (t *tunnel) start(opts O.Options, platformIfce libbox.PlatformInterface) (err error) { + if t.status.Load() != ipc.Restarting { + t.setStatus(ipc.Connecting, nil) + } t.ctx, t.cancel = context.WithCancel(box.BaseContext()) + defer func() { + if err != nil { + t.setStatus(ipc.ErrorStatus, err) + } + }() if err := t.init(opts, platformIfce); err != nil { t.close() @@ -75,7 +83,7 @@ func (t *tunnel) start(opts O.Options, platformIfce libbox.PlatformInterface) er slog.Error("Failed to connect tunnel", "error", err) return fmt.Errorf("connecting tunnel: %w", err) } - t.status.Store(ipc.StatusRunning) + t.setStatus(ipc.Connected, nil) t.optsMap = makeOutboundOptsMap(t.ctx, opts.Outbounds, opts.Endpoints) return nil } @@ -245,7 +253,7 @@ func (t *tunnel) connect() (err error) { func (t *tunnel) selectOutbound(group, tag string) error { t.reloadAccess.Lock() defer t.reloadAccess.Unlock() - if status := t.Status(); status != ipc.StatusRunning { + if status := t.Status(); status != ipc.Connected { return fmt.Errorf("tunnel not running: status %v", status) } @@ -263,6 +271,9 @@ func (t *tunnel) selectOutbound(group, tag string) error { } func (t *tunnel) close() error { + if t.status.Load() != ipc.Restarting { + t.setStatus(ipc.Disconnecting, nil) + } if t.cancel != nil { t.cancel() } @@ -285,12 +296,23 @@ func (t *tunnel) close() error { t.closers = nil t.lbService = nil - t.status.Store(ipc.StatusClosed) + if t.status.Load() != ipc.Restarting { + t.setStatus(ipc.Disconnected, nil) + } return err } -func (t *tunnel) Status() string { - return t.status.Load().(string) +func (t *tunnel) Status() ipc.VPNStatus { + return t.status.Load().(ipc.VPNStatus) +} + +func (t *tunnel) setStatus(status ipc.VPNStatus, err error) { + t.status.Store(status) + evt := ipc.StatusUpdateEvent{Status: status} + if err != nil { + evt.Error = err.Error() + } + events.Emit(evt) } var errLibboxClosed = errors.New("libbox closed") diff --git a/vpn/tunnel_test.go b/vpn/tunnel_test.go index b7940e24..e30dd9a7 100644 --- a/vpn/tunnel_test.go +++ b/vpn/tunnel_test.go @@ -41,11 +41,11 @@ func TestConnection(t *testing.T) { tun.close() }) - require.Equal(t, ipc.StatusRunning, tun.Status(), "tunnel should be running") + require.Equal(t, ipc.Connected, tun.Status(), "tunnel should be running") assert.NoError(t, tun.selectOutbound("http", "http1-out"), "failed to select http outbound") assert.NoError(t, tun.close(), "failed to close lbService") - assert.Equal(t, ipc.StatusClosed, tun.Status(), "tun should be closed") + assert.Equal(t, ipc.Disconnected, tun.Status(), "tun should be closed") } func TestUpdateServers(t *testing.T) { @@ -153,6 +153,6 @@ func testConnection(t *testing.T, opts sbO.Options) *tunnel { tun.close() }) - assert.Equal(t, ipc.StatusRunning, tun.Status(), "tunnel should be running") + assert.Equal(t, ipc.Connected, tun.Status(), "tunnel should be running") return tun } diff --git a/vpn/vpn.go b/vpn/vpn.go index 5aaed227..f9d1a007 100644 --- a/vpn/vpn.go +++ b/vpn/vpn.go @@ -118,7 +118,7 @@ func isOpen(ctx context.Context) bool { if err != nil { slog.Error("Failed to get tunnel state", "error", err) } - return state == ipc.StatusRunning + return state == ipc.Connected } // Disconnect closes the tunnel and all active connections. diff --git a/vpn/vpn_test.go b/vpn/vpn_test.go index 339dea00..1396c50f 100644 --- a/vpn/vpn_test.go +++ b/vpn/vpn_test.go @@ -63,7 +63,7 @@ func TestSelectServer(t *testing.T) { selector := outbound.(_selector) require.NoError(t, selector.Start(), "failed to start selector") - mservice.status = ipc.StatusRunning + mservice.status = ipc.Connected require.NoError(t, selectServer(context.Background(), tt.wantGroup, tt.wantTag)) assert.Equal(t, tt.wantTag, selector.Now(), tt.wantTag+" should be selected") assert.Equal(t, tt.wantGroup, clashServer.Mode(), "clash mode should be "+tt.wantGroup) @@ -131,7 +131,7 @@ func TestAutoServerSelections(t *testing.T) { service.MustRegister[adapter.OutboundManager](ctx, mgr) m := &mockService{ ctx: ctx, - status: ipc.StatusRunning, + status: ipc.Connected, } ipcServer := ipc.NewServer(m) require.NoError(t, ipcServer.Start()) @@ -181,12 +181,12 @@ var _ ipc.Service = (*mockService)(nil) type mockService struct { ctx context.Context - status string + status ipc.VPNStatus clash *clashapi.Server } func (m *mockService) Ctx() context.Context { return m.ctx } -func (m *mockService) Status() string { return m.status } +func (m *mockService) Status() ipc.VPNStatus { return m.status } func (m *mockService) ClashServer() *clashapi.Server { return m.clash } func (m *mockService) Close() error { return nil } func (m *mockService) Start(ctx context.Context, group, tag string) error { return nil } @@ -213,7 +213,7 @@ func setupVpnTest(t *testing.T) *mockService { m := &mockService{ ctx: ctx, - status: ipc.StatusRunning, + status: ipc.Connected, clash: clashServer.(*clashapi.Server), } ipcServer := ipc.NewServer(m) From d25584748a44e31512bea60a8b8f7e992826e93a Mon Sep 17 00:00:00 2001 From: garmr Date: Wed, 25 Feb 2026 17:45:40 -0800 Subject: [PATCH 2/4] fix tests --- vpn/ipc/events_test.go | 21 +++------------------ vpn/ipc/status.go | 2 +- 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/vpn/ipc/events_test.go b/vpn/ipc/events_test.go index 4a7a6102..15bda173 100644 --- a/vpn/ipc/events_test.go +++ b/vpn/ipc/events_test.go @@ -18,7 +18,7 @@ import ( ) func TestStatusEventsHandler(t *testing.T) { - svc := newMockService() + svc := &mockService{status: Disconnected} s := &Server{service: svc} rec := httptest.NewRecorder() @@ -49,14 +49,6 @@ func TestStatusEventsHandler(t *testing.T) { evt = StatusUpdateEvent{Status: ErrorStatus, Error: "something went wrong"} events.Emit(evt) waitAssert(evt, "error event not received") - - // Cancel the service context — handler should return. - svc.Close() - select { - case <-done: - case <-time.After(time.Second): - require.Fail(t, "handler did not return after service context cancellation") - } } func parseEventLine(t *testing.T, body *bytes.Buffer) StatusUpdateEvent { @@ -70,19 +62,12 @@ func parseEventLine(t *testing.T, body *bytes.Buffer) StatusUpdateEvent { } type mockService struct { - ctx context.Context - cancel context.CancelFunc status VPNStatus } -func newMockService() *mockService { - ctx, cancel := context.WithCancel(context.Background()) - return &mockService{ctx: ctx, cancel: cancel, status: Disconnected} -} - -func (m *mockService) Ctx() context.Context { return m.ctx } +func (m *mockService) Ctx() context.Context { return nil } func (m *mockService) Status() VPNStatus { return m.status } func (m *mockService) Start(context.Context, string, string) error { return nil } func (m *mockService) Restart(context.Context) error { return nil } func (m *mockService) ClashServer() *clashapi.Server { return nil } -func (m *mockService) Close() error { m.cancel(); return nil } +func (m *mockService) Close() error { return nil } diff --git a/vpn/ipc/status.go b/vpn/ipc/status.go index 82b6c71e..004bd7b7 100644 --- a/vpn/ipc/status.go +++ b/vpn/ipc/status.go @@ -92,7 +92,7 @@ func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) { status := s.service.Status() span.SetAttributes(attribute.String("status", status.String())) w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(status); err != nil { + if err := json.NewEncoder(w).Encode(vpnStatus{Status: status}); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } From 5867baf5bac200831a8b908d8322ba2d682745af Mon Sep 17 00:00:00 2001 From: garmr Date: Wed, 25 Feb 2026 17:53:02 -0800 Subject: [PATCH 3/4] address pr comments --- vpn/ipc/events_client.go | 4 ++-- vpn/ipc/status.go | 2 +- vpn/service.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/vpn/ipc/events_client.go b/vpn/ipc/events_client.go index 50b7666d..9af92b1a 100644 --- a/vpn/ipc/events_client.go +++ b/vpn/ipc/events_client.go @@ -16,8 +16,8 @@ import ( // [StatusUpdateEvent] events until the context is cancelled. If waitForConnect is true, it // polls in a background goroutine until the server is reachable. When the stream is lost // (server restart, network error, clean EOF), a [StatusUpdateEvent] with [Disconnected] status -// is emitted. The retry loop continues until the context is cancelled or a non-recoverable error -// occurs (e.g. connection refused, invalid response). +// is emitted. The retry loop continues until a connection is established, the context is cancelled, +// or a non-recoverable error occurs (e.g. connection refused, invalid response). func StartStatusStream(ctx context.Context, waitForConnect bool) error { if !waitForConnect { return startStream(ctx) diff --git a/vpn/ipc/status.go b/vpn/ipc/status.go index 004bd7b7..28f6087b 100644 --- a/vpn/ipc/status.go +++ b/vpn/ipc/status.go @@ -70,7 +70,7 @@ func GetStatus(ctx context.Context) (VPNStatus, error) { return Disconnected, nil } if err != nil { - return "", fmt.Errorf("error getting status: %w", err) + return ErrorStatus, fmt.Errorf("error getting status: %w", err) } return res.Status, nil } diff --git a/vpn/service.go b/vpn/service.go index 16e69a98..57cf45c2 100644 --- a/vpn/service.go +++ b/vpn/service.go @@ -172,7 +172,7 @@ func (s *TunnelService) Status() ipc.VPNStatus { if s.tunnel == nil { return ipc.Disconnected } - return ipc.VPNStatus(s.tunnel.Status()) + return s.tunnel.Status() } // Ctx returns the context associated with the tunnel, or nil if no tunnel is running. From d6187c1b6c930150bbc1b419e405d6db9b8e0557 Mon Sep 17 00:00:00 2001 From: garmr Date: Fri, 27 Feb 2026 12:46:11 -0800 Subject: [PATCH 4/4] fix test --- vpn/ipc/events_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/vpn/ipc/events_test.go b/vpn/ipc/events_test.go index 15bda173..585804ff 100644 --- a/vpn/ipc/events_test.go +++ b/vpn/ipc/events_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/getlantern/radiance/events" + "github.com/getlantern/radiance/servers" ) func TestStatusEventsHandler(t *testing.T) { @@ -65,9 +66,12 @@ type mockService struct { status VPNStatus } -func (m *mockService) Ctx() context.Context { return nil } -func (m *mockService) Status() VPNStatus { return m.status } -func (m *mockService) Start(context.Context, string, string) error { return nil } -func (m *mockService) Restart(context.Context) error { return nil } -func (m *mockService) ClashServer() *clashapi.Server { return nil } -func (m *mockService) Close() error { return nil } +func (m *mockService) Ctx() context.Context { return nil } +func (m *mockService) Status() VPNStatus { return m.status } +func (m *mockService) Start(context.Context, string) error { return nil } +func (m *mockService) Restart(context.Context, string) error { return nil } +func (m *mockService) ClashServer() *clashapi.Server { return nil } +func (m *mockService) Close() error { return nil } +func (m *mockService) UpdateOutbounds(options servers.Servers) error { return nil } +func (m *mockService) AddOutbounds(group string, options servers.Options) error { return nil } +func (m *mockService) RemoveOutbounds(group string, tags []string) error { return nil }