Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions vpn/ipc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ const (
connectionsEndpoint = "/connections"
closeConnectionsEndpoint = "/connections/close"
setSettingsPathEndpoint = "/set"
statusEventsEndpoint = "/status/events"
)
58 changes: 58 additions & 0 deletions vpn/ipc/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package ipc

import (
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/getlantern/radiance/events"
)

// StatusUpdateEvent is emitted when the VPN status changes.
type StatusUpdateEvent struct {
events.Event
Status VPNStatus `json:"status"`
Error string `json:"error,omitempty"`
}

func (s *Server) statusEventsHandler(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")

ch := make(chan StatusUpdateEvent, 8)

// Send the current status immediately so the client doesn't have to wait for a change.
ch <- StatusUpdateEvent{Status: s.service.Status()}

sub := events.Subscribe(func(evt StatusUpdateEvent) {
select {
case ch <- evt:
default: // drop if client is slow
}
})
defer sub.Unsubscribe()

for {
select {
case evt := <-ch:
buf, err := json.Marshal(evt)
if err != nil {
slog.Error("failed to marshal event", "error", err)
continue
}
fmt.Fprintf(w, "%s\r\n", buf)
flusher.Flush()
case <-r.Context().Done():
slog.Debug("client disconnected")
return
}
}
}
94 changes: 94 additions & 0 deletions vpn/ipc/events_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ipc

import (
"bufio"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"time"

"github.com/getlantern/radiance/events"
)

// StartStatusStream starts streaming status updates from the server and emits received
// [StatusUpdateEvent] events until the context is cancelled. If waitForConnect is true, it
// polls in a background goroutine until the server is reachable. When the stream is lost
// (server restart, network error, clean EOF), a [StatusUpdateEvent] with [Disconnected] status
// is emitted. The retry loop continues until a connection is established, the context is cancelled,
// or a non-recoverable error occurs (e.g. connection refused, invalid response).
func StartStatusStream(ctx context.Context, waitForConnect bool) error {
if !waitForConnect {
return startStream(ctx)
}
go func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
serverListening, err := tryDial(ctx)
if err != nil {
events.Emit(StatusUpdateEvent{
Status: ErrorStatus,
Error: fmt.Sprintf("connection error: %v", err),
})
return
}
if !serverListening {
continue // we started trying to connect before the server is ready
}
err = startStream(ctx)
if ctx.Err() != nil {
return
}
evt := StatusUpdateEvent{Status: Disconnected}
if err != nil {
slog.Warn("status stream disconnected", "error", err)
evt.Error = fmt.Sprintf("stream disconnected: %v", err)
}
// Stream ended cleanly (EOF) — server likely shut down.
events.Emit(evt)
return
}
}
}()
return nil
}

func startStream(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", apiURL+statusEventsEndpoint, nil)
if err != nil {
return fmt.Errorf("creating request: %w", err)
}
client := &http.Client{
Transport: &http.Transport{
DialContext: dialContext,
Protocols: protocols,
},
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("connecting: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status %s", resp.Status)
}

scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var evt StatusUpdateEvent
if err := json.Unmarshal([]byte(line), &evt); err != nil {
continue
}
events.Emit(evt)
}
return scanner.Err()
}
77 changes: 77 additions & 0 deletions vpn/ipc/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ipc

import (
"bytes"
"context"
"encoding/json"
"net/http/httptest"
"strings"
"testing"
"time"

"github.com/sagernet/sing-box/experimental/clashapi"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/getlantern/radiance/events"
"github.com/getlantern/radiance/servers"
)

func TestStatusEventsHandler(t *testing.T) {
svc := &mockService{status: Disconnected}
s := &Server{service: svc}

rec := httptest.NewRecorder()
req := httptest.NewRequest("GET", statusEventsEndpoint, nil)

done := make(chan struct{})
go func() {
defer close(done)
s.statusEventsHandler(rec, req)
}()

waitAssert := func(want StatusUpdateEvent, msg string) {
require.Eventually(t, func() bool {
return strings.Contains(rec.Body.String(), "\r\n")
}, time.Second, 10*time.Millisecond, msg)
evt := parseEventLine(t, rec.Body)
rec.Body.Reset()
assert.Equal(t, want, evt, msg)
}
waitAssert(StatusUpdateEvent{Status: Disconnected}, "initial event not received")

// Emit a status change and wait for it to arrive.
evt := StatusUpdateEvent{Status: Connected}
events.Emit(evt)
waitAssert(evt, "connected event not received")

// Emit an error status
evt = StatusUpdateEvent{Status: ErrorStatus, Error: "something went wrong"}
events.Emit(evt)
waitAssert(evt, "error event not received")
}

func parseEventLine(t *testing.T, body *bytes.Buffer) StatusUpdateEvent {
line, err := body.ReadBytes('\n')
require.NoError(t, err)

var evt StatusUpdateEvent
line = bytes.TrimSpace(line)
require.NoError(t, json.Unmarshal(line, &evt))
return evt
}

type mockService struct {
status VPNStatus
}

func (m *mockService) Ctx() context.Context { return nil }
func (m *mockService) Status() VPNStatus { return m.status }
func (m *mockService) Start(context.Context, string) error { return nil }
func (m *mockService) Restart(context.Context, string) error { return nil }
func (m *mockService) ClashServer() *clashapi.Server { return nil }
func (m *mockService) Close() error { return nil }
func (m *mockService) UpdateOutbounds(options servers.Servers) error { return nil }
func (m *mockService) AddOutbounds(group string, options servers.Options) error { return nil }
func (m *mockService) RemoveOutbounds(group string, tags []string) error { return nil }
10 changes: 9 additions & 1 deletion vpn/ipc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (

const tracerName = "github.com/getlantern/radiance/vpn/ipc"

var protocols = func() *http.Protocols {
p := &http.Protocols{}
p.SetUnencryptedHTTP2(true)
return p
}()

// empty is a placeholder type for requests that do not expect a response body.
type empty struct{}

Expand All @@ -40,7 +46,9 @@ func sendRequest[T any](ctx context.Context, method, endpoint string, data any)
}
client := &http.Client{
Transport: &http.Transport{
DialContext: dialContext,
DialContext: dialContext,
Protocols: protocols,
ForceAttemptHTTP2: true,
},
}
resp, err := client.Do(req)
Expand Down
Loading