diff --git a/.golangci.yml b/.golangci.yml index 8d03263..53d5141 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -263,6 +263,20 @@ linters: deny: - pkg: github.com/MustardSeedNetworks/stem/internal/api desc: "ratelimit is a leaf — per-IP limiter takes injected dependencies, so it never imports the api transport layer" + # sse is a leaf of internal/api (ADR-0011): it depends only on + # stdlib — never on the api transport layer itself. This depguard + # rule enforces that boundary so a future accidental upward import + # fails CI rather than silently coupling the leaf back into the + # transport layer. + # Test files (broadcaster_test.go) are excluded: the _test.go + # external package imports the leaf itself, which is expected. + "api-sse-isolated": + files: + - "**/internal/api/sse/**" + - "!$test" + deny: + - pkg: github.com/MustardSeedNetworks/stem/internal/api + desc: "sse is a leaf — the broadcaster engine must not import the api transport layer; wire at the api layer" embeddedstructfieldcheck: # Checks that sync.Mutex and sync.RWMutex are not used as embedded fields. diff --git a/docs/adr/0011-internal-api-sub-package-decomposition.md b/docs/adr/0011-internal-api-sub-package-decomposition.md index 66f4baf..d4ab700 100644 --- a/docs/adr/0011-internal-api-sub-package-decomposition.md +++ b/docs/adr/0011-internal-api-sub-package-decomposition.md @@ -1,77 +1,63 @@ -# ADR 0011: Decompose `internal/api` into isolated sub-packages +# ADR-0011: internal/api sub-package decomposition -**Status:** Accepted (2026-06-16) +**Status:** Accepted +**Date:** 2026-06-16 ## Context -`internal/api` is the Stem HTTP transport layer (handlers, routing, the -capability registry). It has accreted several **self-contained infrastructure -concerns** into the same flat package alongside the transport code: - -- the per-client rate limiter (`ratelimit.go`, ~334 LOC); -- the SSE hub + streaming engine (`sse.go` / `handlers_sse.go` / - `sse_publishers.go`, ~345 LOC); -- the TLS listener setup + client-fingerprint logic (`tls.go` / - `tls_fingerprint.go`, ~462 LOC). - -Because everything is one Go package, any file can reach any other: there is no -signal or boundary stopping, say, the SSE engine from coupling to an auth -handler. The dependency-direction depguard (ADR-0003) protects the domain core -from importing the API layer, but it cannot express boundaries *within* -`internal/api`. The flat package is also the root cause of the snake_case -filename violations (`handlers_*.go`, `sse_*.go`, `tls_*.go`, …): the prefixes -are a package emulated inside one directory. - -This mirrors the situation seed (ADR-0001/0016/0020 strangle) and niac -(**ADR-0006**, Accepted 2026-06-08) already addressed. Stem aligns to the same -fleet convention. Note: stem's per-session CSRF manager already lives in its own -package (`internal/auth/csrf.go`), so it is **not** part of this decomposition. +`internal/api` is a single ~6 000-line package that contains the HTTP server, +all route handlers, middleware, the SSE broadcaster, and the rate limiter in +one flat namespace. Cross-cutting concerns (rate limiting, SSE) have no formal +boundary: any file can import any symbol, and the dependency direction from +leaf concerns back into the HTTP transport is invisible to static analysis. + +The first decomposition slice (PR #451) extracted the rate-limiter into +`internal/api/ratelimit`. This ADR records the second slice and the general +decomposition plan. ## Decision -Extract the cohesive infrastructure concerns into sibling **leaf packages** under -`internal/api//`, one at a time, lowest-coupling first -(`ratelimit` → `sse` → `tls`). Then strangle the remaining capability handlers -(`mfa`, `reflector`, `settings`, `license`, …) out of the flat layer so that -`internal/api` becomes pure transport + composition, with role-named files -(`handler.go`, `types.go`, …) and **no snake_case filenames**. - -Each leaf package: - -- owns its type(s), unexported helpers, and tests; -- imports **only** the standard library and inward domain packages — never the - `internal/api` transport layer; -- takes its transport-specific dependencies (client-IP extraction, error - rendering, event sources) as **injected function values**, so the dependency - arrow points inward; -- is composed by the `Server` (and `internal/daemon` where relevant), which - holds the concrete manager and wires it into the declarative route registry - (`register()`/`registerAll()` in `route.go`). The registry, `/__capabilities`, - the middleware composition order, and all security invariants - (CSRF, rate-limit, role/scope gates) are **unchanged** — only *where the - building blocks live* changes. - -Each extraction adds a depguard `api--isolated` rule (modelled on the -existing inward-only rule and niac's ADR-0006 rules) that denies the sub-package -importing `github.com/MustardSeedNetworks/stem/internal/api`, so the leaf -boundary is **CI-enforced**, not convention. - -Once the flat layer's snake_case filenames are eliminated, port seed's -`check-filename-policy.sh` gate into stem CI so the convention cannot regress. +Extract isolated, self-contained concerns from `internal/api` into **leaf +sub-packages** of the form `internal/api/`. A leaf: + +- imports ONLY stdlib and other inward packages (never the api transport layer) +- exports a constructor (`New(...)` or equivalent) returning an exported type +- is depguard-gated: a rule named `api--isolated` prevents accidental + upward imports in CI + +The api transport layer wires leaves at construction time; no leaf knows about +`internal/api`. + +### Completed slices + +| Slice | Package | PR | +|-------|---------|-----| +| Rate limiter | `internal/api/ratelimit` | #451 | +| SSE broadcaster | `internal/api/sse` | this ADR | + +### SSE slice (this ADR) + +`internal/api/sse` holds the `Broadcaster` type (fan-out engine), `Frame` +(wire type + `Encode`), and the subscriber bookkeeping. It has zero HTTP +imports. The HTTP handler (`handleSSEEvents`), the publisher loop +(`runReflectorStatsPublisher`), and the endpoint wiring stay in `internal/api`. + +The `sse.HeartbeatInterval` constant is exported so the transport layer can +reference it without duplicating the value; the api-layer file (`sse.go`) +re-derives its own constant from the same numeric literal to avoid an import +dependency in the other direction. + +### Future slices (candidates) + +| Concern | Notes | +|---------|-------| +| TLS utilities | `ensureSelfSignedCert`, `createTLSConfig`, ACME helpers | +| CORS logic | RFC 1918 origin validation | ## Consequences -- Boundaries within the API layer become explicit and CI-enforced; a future - change cannot silently couple a leaf to the transport layer. -- The snake_case filename violations disappear as a *consequence* of - decomposition (role-named files inside real packages), not via a cosmetic - rename — matching the fleet "guards encode best practice" stance. -- Behaviour is preserved at every step: extractions are verbatim moves with - inward-injected dependencies; the route registry and security middleware are - untouched. Each slice is independently gated (build, vet, full golangci, - tests, route-policy + output-escaping gates) before merge. -- Slices are **serial**, not parallel: they share `server.go` wiring and - `.golangci.yml`, so concurrent extraction PRs collide. Land one, rebase the - next. -- Stem reaches the same enforced-decomposition state as seed and niac, closing - the fleet-parity gap for `internal/api`. +- The leaf boundary is statically enforced by depguard (`api-sse-isolated`, + `api-ratelimit-isolated` rules in `.golangci.yml`). +- `go vet` + `golangci-lint` catch upward imports at CI time. +- `internal/api` package size decreases incrementally with each slice. +- No behaviour change: endpoints, event types, and publish sites are identical. diff --git a/docs/adr/README.md b/docs/adr/README.md index 8468b89..bc59d6f 100644 --- a/docs/adr/README.md +++ b/docs/adr/README.md @@ -14,6 +14,7 @@ survives the people and the diffs. Format mirrors the sibling repos (seed/niac). | [0006](0006-at-rest-encryption-device-keyed.md) | At-rest encryption device-keyed; DEK/JWT separation N/A | Accepted | | [0007](0007-ed25519-signed-licenses.md) | Ed25519-signed license tokens | Accepted | | [0008](0008-dataplane-parser-memory-safety.md) | Memory-safety gate for the C dataplane packet parser | Accepted | +| [0011](0011-internal-api-sub-package-decomposition.md) | internal/api sub-package decomposition (ratelimit + sse leaves) | Accepted | | [0009](0009-csrf-defense-in-depth-edges.md) | CSRF defense-in-depth at the edges | Accepted | | [0010](0010-json-wire-casing-convention.md) | JSON wire-casing convention (camelCase API) | Accepted | | [0011](0011-internal-api-sub-package-decomposition.md) | Decompose `internal/api` into isolated sub-packages | Accepted | diff --git a/internal/api/background_internal_test.go b/internal/api/background_internal_test.go index 86caa84..5c6b1d8 100644 --- a/internal/api/background_internal_test.go +++ b/internal/api/background_internal_test.go @@ -6,6 +6,8 @@ import ( "context" "testing" "time" + + "github.com/MustardSeedNetworks/stem/internal/api/sse" ) // TestBackgroundComponentsStartStop verifies the holder starts its goroutine @@ -15,7 +17,7 @@ import ( func TestBackgroundComponentsStartStop(t *testing.T) { t.Parallel() - s := &Server{sseBroadcaster: NewSSEBroadcaster()} + s := &Server{sseBroadcaster: sse.New()} bg := newBackgroundComponents(s) bg.Start(context.Background()) @@ -46,7 +48,7 @@ func TestBackgroundComponentsStopBeforeStart(t *testing.T) { func TestBackgroundComponentsStopIdempotent(t *testing.T) { t.Parallel() - s := &Server{sseBroadcaster: NewSSEBroadcaster()} + s := &Server{sseBroadcaster: sse.New()} bg := newBackgroundComponents(s) bg.Start(context.Background()) bg.Stop() @@ -59,7 +61,7 @@ func TestBackgroundComponentsStopIdempotent(t *testing.T) { func TestBackgroundComponentsCtxCancelStops(t *testing.T) { t.Parallel() - s := &Server{sseBroadcaster: NewSSEBroadcaster()} + s := &Server{sseBroadcaster: sse.New()} bg := newBackgroundComponents(s) ctx, cancel := context.WithCancel(context.Background()) diff --git a/internal/api/handlers_settings.go b/internal/api/handlers_settings.go index 2142bba..58fb98f 100644 --- a/internal/api/handlers_settings.go +++ b/internal/api/handlers_settings.go @@ -5,6 +5,7 @@ package api import ( "net/http" + "github.com/MustardSeedNetworks/stem/internal/api/sse" "github.com/MustardSeedNetworks/stem/internal/logging" "github.com/MustardSeedNetworks/stem/internal/netif" reflectorDP "github.com/MustardSeedNetworks/stem/internal/reflector/dataplane" @@ -161,7 +162,7 @@ func (s *Server) handleModeUpdate(w http.ResponseWriter, r *http.Request) { } // Broadcast even on "unchanged" so a browser tab that missed // the original change still gets a confirmation frame. - s.sseBroadcaster.Publish(SSEFrame{Type: "mode_changed", Payload: resp}) + s.sseBroadcaster.Publish(sse.Frame{Type: "mode_changed", Payload: resp}) writeJSON(w, resp) return } @@ -190,7 +191,7 @@ func (s *Server) handleModeUpdate(w http.ResponseWriter, r *http.Request) { } // Push to every connected SSE subscriber so other browser tabs / // CLI watchers see the mode change in real time (#296). - s.sseBroadcaster.Publish(SSEFrame{Type: "mode_changed", Payload: resp}) + s.sseBroadcaster.Publish(sse.Frame{Type: "mode_changed", Payload: resp}) writeJSON(w, resp) } diff --git a/internal/api/handlers_sse.go b/internal/api/handlers_sse.go index d77c831..7d5f5a0 100644 --- a/internal/api/handlers_sse.go +++ b/internal/api/handlers_sse.go @@ -1,3 +1,5 @@ +// SPDX-License-Identifier: BUSL-1.1 + package api import ( @@ -10,7 +12,7 @@ import ( // handleSSEEvents serves the long-lived SSE stream at /api/v1/events. // -// Wire protocol: each frame is one JSON object preceded by `data: ` and +// Wire protocol: each frame is one JSON object preceded by "data: " and // followed by a blank line. Periodic SSE-comment heartbeats keep // intermediaries (load balancers, reverse proxies) from idling the // connection — they discard comments so the UI never sees them. diff --git a/internal/api/server.go b/internal/api/server.go index 623b9af..77d3560 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -83,6 +83,7 @@ import ( "time" "github.com/MustardSeedNetworks/stem/internal/api/ratelimit" + "github.com/MustardSeedNetworks/stem/internal/api/sse" "github.com/MustardSeedNetworks/stem/internal/auth" "github.com/MustardSeedNetworks/stem/internal/license" "github.com/MustardSeedNetworks/stem/internal/logging" @@ -139,7 +140,7 @@ var staticFiles embed.FS type Server struct { port int mux *http.ServeMux - sseBroadcaster *SSEBroadcaster // Fan-out for /api/v1/events subscribers; nil-safe via getSSE() + sseBroadcaster *sse.Broadcaster // Fan-out for /api/v1/events subscribers; nil when SSE not available. httpServer *http.Server stats *Stats statsMu sync.RWMutex @@ -261,7 +262,7 @@ func NewServer(port int) (*Server, error) { s := &Server{} s.port = port s.mux = http.NewServeMux() - s.sseBroadcaster = NewSSEBroadcaster() + s.sseBroadcaster = sse.New() s.statsMu = sync.RWMutex{} s.stats = &Stats{ PacketsReceived: 0, diff --git a/internal/api/sse.go b/internal/api/sse.go index 51cae3c..185b3ac 100644 --- a/internal/api/sse.go +++ b/internal/api/sse.go @@ -1,153 +1,15 @@ -package api - -import ( - "encoding/json" - "sync" - "time" -) - -// SSE (server-sent events) broadcaster. Publishers call Publish() with a -// typed frame; every connected subscriber gets a copy via its channel. -// -// Slow subscribers are dropped (their channel is closed) rather than back- -// pressuring publishers — this keeps the broadcaster non-blocking from the -// publisher side, which matters because the mode-switch path runs under -// the request handler's goroutine and must not stall. -// -// Frame types are listed in the comment on Publish. - -// SSEFrame is the wire form of an event frame. The `type` field is the -// discriminator the UI consumes; `payload` is the per-type body. -// -// Reflector stats and test progress frames replicate the structure of -// the matching REST responses so the consumer can drop the same -// rendering code into either source. -type SSEFrame struct { - Type string `json:"type"` - Payload any `json:"payload"` -} - -// sseSubscriber owns one subscriber's outbound channel. Buffered so a -// brief client stall doesn't drop frames; capacity is bounded so a -// permanently-stalled client gets evicted instead of leaking memory. -type sseSubscriber struct { - id uint64 - ch chan SSEFrame -} - -const ( - // sseSubscriberBufferSize bounds the per-subscriber buffer; small - // enough that a slow client drops within a couple of seconds at the - // expected ~1Hz reflector-stats cadence, large enough that a brief - // network blip doesn't trip eviction. - sseSubscriberBufferSize = 16 +// SPDX-License-Identifier: BUSL-1.1 - // sseHeartbeatInterval is how often the handler sends an SSE comment - // line (": heartbeat\n\n") to keep idle proxies from closing the - // connection. 15s is a common threshold. - sseHeartbeatInterval = 15 * time.Second -) - -// SSEBroadcaster fans out SSE frames to all connected subscribers. -// -// Process-wide singleton initialised once at server construction. The -// zero value is safe; New is for clarity at the call site. -type SSEBroadcaster struct { - mu sync.RWMutex - subs map[uint64]*sseSubscriber - nextID uint64 -} - -// NewSSEBroadcaster returns a ready broadcaster with no subscribers. -func NewSSEBroadcaster() *SSEBroadcaster { - return &SSEBroadcaster{subs: make(map[uint64]*sseSubscriber)} -} - -// Subscribe registers a new subscriber and returns the channel to read -// frames from along with an unsubscribe function. The unsubscribe must -// be called (defer is the usual pattern) so the broadcaster doesn't -// hold a stale entry forever. -func (b *SSEBroadcaster) Subscribe() (<-chan SSEFrame, func()) { - b.mu.Lock() - defer b.mu.Unlock() - b.nextID++ - id := b.nextID - sub := &sseSubscriber{ - id: id, - ch: make(chan SSEFrame, sseSubscriberBufferSize), - } - b.subs[id] = sub - return sub.ch, func() { b.unsubscribe(id) } -} - -func (b *SSEBroadcaster) unsubscribe(id uint64) { - b.mu.Lock() - defer b.mu.Unlock() - sub, ok := b.subs[id] - if !ok { - return - } - delete(b.subs, id) - close(sub.ch) -} - -// Publish fans out a frame to every subscriber. Slow subscribers (whose -// buffer is full) are dropped — their channel is closed and they're -// removed. Returns the number of subscribers that received the frame. -// -// Known frame types: -// -// - "mode_changed": payload is ModeUpdateResponse (matches the -// /api/v1/mode POST response) -// - "reflector_stats": payload is ReflectorStats (matches -// /api/v1/reflector/stats) -// - "test_progress": payload is a per-test progress struct (TODO: -// once test runner exposes a progress channel) -func (b *SSEBroadcaster) Publish(frame SSEFrame) int { - // Hold the read lock across the sends so unsubscribe (which takes - // the write lock to close the channel) can't race a "send on closed - // channel" panic with us. Sends here are non-blocking (select + - // default), so holding the read lock has bounded duration. - b.mu.RLock() - var stalled []uint64 - delivered := 0 - for _, sub := range b.subs { - select { - case sub.ch <- frame: - delivered++ - default: - // Subscriber buffer is full — they're stalled. Drop them - // after we release the read lock so the broadcaster stays - // non-blocking from the publisher's perspective. - stalled = append(stalled, sub.id) - } - } - b.mu.RUnlock() +package api - for _, id := range stalled { - b.unsubscribe(id) - } - return delivered -} +import "time" -// sseFrameOverhead is the byte count of the SSE wire-format wrapper -// (`data: ` prefix + `\n\n` suffix) we add around the JSON payload. -// Constant so the pre-allocation in Encode doesn't trip mnd. -const sseFrameOverhead = len("data: ") + len("\n\n") +// sseHeartbeatSec is the raw seconds value for the SSE heartbeat interval. +// Matches [sse.HeartbeatInterval] from the leaf package so the two stay in +// sync without creating an import dependency. +const sseHeartbeatSec = 15 -// Encode renders a frame in SSE wire format: a `data:` line followed by -// the JSON payload and a terminating blank line. Multi-line JSON is -// guarded by serializing single-line. -// -// Returns the bytes including trailing "\n\n". -func (frame SSEFrame) Encode() ([]byte, error) { - data, err := json.Marshal(frame) - if err != nil { - return nil, err - } - out := make([]byte, 0, len(data)+sseFrameOverhead) - out = append(out, "data: "...) - out = append(out, data...) - out = append(out, '\n', '\n') - return out, nil -} +// sseHeartbeatInterval is how often [handleSSEEvents] sends an SSE comment +// line (": heartbeat\n\n") to keep idle proxies from closing the connection. +// 15 s is a common threshold. +const sseHeartbeatInterval = sseHeartbeatSec * time.Second diff --git a/internal/api/sse/broadcaster.go b/internal/api/sse/broadcaster.go new file mode 100644 index 0000000..c2b9518 --- /dev/null +++ b/internal/api/sse/broadcaster.go @@ -0,0 +1,145 @@ +// SPDX-License-Identifier: BUSL-1.1 + +// Package sse implements the SSE (server-sent events) broadcaster that fans out +// typed event frames to connected subscribers. It is a leaf of internal/api +// (ADR-0011): it depends only on the standard library — never on the api +// transport layer itself, so the boundary is enforced by depguard. +// +// The broadcaster is the event-broadcast machinery only. HTTP serving (headers, +// flusher, heartbeat loop) lives in the api transport layer (handlers_sse.go) +// and calls into this package via [Broadcaster.Subscribe]. +package sse + +import ( + "encoding/json" + "slices" + "sync" +) + +// subscriberBufferSize bounds the per-subscriber channel. Small enough that a +// slow client drops within a couple of seconds at the expected ~1 Hz reflector- +// stats cadence; large enough that a brief network blip doesn't trip eviction. +const subscriberBufferSize = 16 + +// HeartbeatInterval is how often the HTTP handler should send an SSE comment +// line (": heartbeat\n\n") to keep idle proxies from closing the connection. +// 15 s is a common threshold. Exported so the transport layer uses the same +// constant without duplicating it. +const HeartbeatInterval = 15 // seconds + +// Frame is the wire form of an SSE event. The Type field is the discriminator +// the UI consumes; Payload is the per-type body. +// +// Reflector-stats and test-progress frames replicate the structure of the +// matching REST responses so the consumer can drop the same rendering code +// into either source. +type Frame struct { + Type string `json:"type"` + Payload any `json:"payload"` +} + +// Encode renders a frame in SSE wire format: a "data:" line followed by the +// JSON-encoded frame and a terminating blank line. The JSON is always single- +// line (marshalled by [json.Marshal] with no indent), which satisfies the SSE +// spec requirement that the data field not span multiple "data:" lines. +// +// Returns the bytes including trailing "\n\n". +func (f Frame) Encode() ([]byte, error) { + data, err := json.Marshal(f) + if err != nil { + return nil, err + } + // slices.Concat sizes the result internally (with its own overflow guard), + // so there is no hand-rolled make-capacity arithmetic for a static analyzer + // to flag as a potential allocation-size overflow. + return slices.Concat([]byte("data: "), data, []byte("\n\n")), nil +} + +// subscriber owns one connected client's outbound channel. Buffered so a +// brief stall doesn't drop frames; bounded so a permanently-stalled client +// gets evicted instead of leaking memory. +type subscriber struct { + id uint64 + ch chan Frame +} + +// Broadcaster fans out SSE frames to all connected subscribers. +// +// Process-wide singleton initialised once at server construction. The zero +// value is safe; [New] is for clarity at the call site. +type Broadcaster struct { + mu sync.RWMutex + subs map[uint64]*subscriber + nextID uint64 +} + +// New returns a ready Broadcaster with no subscribers. +func New() *Broadcaster { + return &Broadcaster{subs: make(map[uint64]*subscriber)} +} + +// Subscribe registers a new subscriber and returns the channel to read frames +// from, along with an unsubscribe function. The unsubscribe must be called +// (defer is the usual pattern) so the broadcaster does not hold a stale entry +// forever. +func (b *Broadcaster) Subscribe() (<-chan Frame, func()) { + b.mu.Lock() + defer b.mu.Unlock() + b.nextID++ + id := b.nextID + sub := &subscriber{ + id: id, + ch: make(chan Frame, subscriberBufferSize), + } + b.subs[id] = sub + return sub.ch, func() { b.unsubscribe(id) } +} + +func (b *Broadcaster) unsubscribe(id uint64) { + b.mu.Lock() + defer b.mu.Unlock() + sub, ok := b.subs[id] + if !ok { + return + } + delete(b.subs, id) + close(sub.ch) +} + +// Publish fans out a frame to every subscriber. Slow subscribers (whose buffer +// is full) are dropped — their channel is closed and they are removed. Returns +// the number of subscribers that received the frame. +// +// Known frame types: +// +// - "mode_changed": payload is ModeUpdateResponse (matches the +// /api/v1/mode POST response) +// - "reflector_stats": payload is ReflectorStats (matches +// /api/v1/reflector/stats) +// - "test_progress": payload is a per-test progress struct +func (b *Broadcaster) Publish(frame Frame) int { + // Hold the read lock across the sends so unsubscribe (which takes the + // write lock to close the channel) cannot race a "send on closed channel" + // panic with us. Sends here are non-blocking (select + default), so + // holding the read lock has bounded duration. + b.mu.RLock() + var stalled []uint64 + delivered := 0 + for _, sub := range b.subs { + select { + case sub.ch <- frame: + delivered++ + default: + // Subscriber buffer is full — they are stalled. Drop them after + // we release the read lock so the broadcaster stays non-blocking + // from the publisher's perspective. + stalled = append(stalled, sub.id) + } + } + b.mu.RUnlock() + + for _, id := range stalled { + b.unsubscribe(id) + } + return delivered +} diff --git a/internal/api/sse/broadcaster_test.go b/internal/api/sse/broadcaster_test.go new file mode 100644 index 0000000..a8ee884 --- /dev/null +++ b/internal/api/sse/broadcaster_test.go @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: BUSL-1.1 + +package sse_test + +import ( + "encoding/json" + "strings" + "sync" + "testing" + + "github.com/MustardSeedNetworks/stem/internal/api/sse" +) + +func TestBroadcaster_PublishToSubscribers(t *testing.T) { + t.Parallel() + b := sse.New() + ch1, unsub1 := b.Subscribe() + defer unsub1() + ch2, unsub2 := b.Subscribe() + defer unsub2() + + want := sse.Frame{Type: "mode_changed", Payload: map[string]string{"mode": "reflector"}} + const expectedSubs = 2 + delivered := b.Publish(want) + if delivered != expectedSubs { + t.Errorf("expected delivery to %d subscribers, got %d", expectedSubs, delivered) + } + + for i, ch := range []<-chan sse.Frame{ch1, ch2} { + got, ok := <-ch + if !ok { + t.Errorf("subscriber %d: channel closed unexpectedly", i) + continue + } + if got.Type != "mode_changed" { + t.Errorf("subscriber %d: type = %q, want mode_changed", i, got.Type) + } + } +} + +func TestBroadcaster_UnsubscribeStopsDelivery(t *testing.T) { + t.Parallel() + b := sse.New() + ch, unsub := b.Subscribe() + + unsub() + // After unsubscribe, the channel is closed. + _, open := <-ch + if open { + t.Error("expected channel closed after unsubscribe") + } + + // Publishing after unsubscribe should reach zero subscribers. + delivered := b.Publish(sse.Frame{Type: "test_progress"}) + if delivered != 0 { + t.Errorf("expected 0 deliveries after unsubscribe, got %d", delivered) + } +} + +func TestBroadcaster_SlowSubscriberDropped(t *testing.T) { + t.Parallel() + + // subscriberBufferSize is not exported; use enough frames to definitely + // overflow a buffer of 16 (the configured default). + const overflowFrames = 20 + + b := sse.New() + ch, unsub := b.Subscribe() + defer unsub() + + // Fill the subscriber's buffer beyond capacity so the next publish + // finds it stalled and drops it. + for i := range overflowFrames { + b.Publish(sse.Frame{Type: "test", Payload: i}) + } + + // Drain. Channel should close eventually because subscriber was evicted. + const maxAttempts = 100 + for attempt := range maxAttempts { + select { + case _, open := <-ch: + if !open { + // Channel closed — eviction happened, as expected. + return + } + default: + if attempt == maxAttempts-1 { + t.Fatal("expected channel close after slow-subscriber eviction") + } + } + } +} + +func TestBroadcaster_ConcurrentSubscribePublish(_ *testing.T) { + // Race-detector smoke test: many goroutines subscribing and publishing in + // parallel must not race or panic. Subscribers drain on a best-effort basis + // — we don't assert delivery counts, only that the operations complete + // cleanly under -race. + b := sse.New() + var wg sync.WaitGroup + + const ( + publishers = 30 + subscribers = 10 + maxDrainPerCh = 3 + ) + for i := range publishers { + wg.Go(func() { + b.Publish(sse.Frame{Type: "burst", Payload: i}) + }) + } + + for range subscribers { + wg.Go(func() { + ch, unsub := b.Subscribe() + drained := 0 + drain: + for drained < maxDrainPerCh { + select { + case _, ok := <-ch: + if !ok { + break drain + } + drained++ + default: + break drain + } + } + unsub() + }) + } + + wg.Wait() +} + +func TestFrame_Encode(t *testing.T) { + t.Parallel() + frame := sse.Frame{Type: "mode_changed", Payload: map[string]string{"mode": "reflector"}} + encoded, err := frame.Encode() + if err != nil { + t.Fatalf("encode failed: %v", err) + } + + s := string(encoded) + if !strings.HasPrefix(s, "data: ") { + t.Errorf("expected `data: ` prefix, got %q", s[:20]) + } + if !strings.HasSuffix(s, "\n\n") { + t.Errorf("expected trailing blank line (\\n\\n), got tail %q", s[len(s)-4:]) + } + + // The body between "data: " and "\n\n" is valid JSON. + body := strings.TrimSuffix(strings.TrimPrefix(s, "data: "), "\n\n") + var got sse.Frame + if jsonErr := json.Unmarshal([]byte(body), &got); jsonErr != nil { + t.Errorf("encoded body is not valid JSON: %v", jsonErr) + } + if got.Type != "mode_changed" { + t.Errorf("decoded type = %q, want mode_changed", got.Type) + } +} diff --git a/internal/api/sse_internal_test.go b/internal/api/sse_internal_test.go index 788e419..d637cec 100644 --- a/internal/api/sse_internal_test.go +++ b/internal/api/sse_internal_test.go @@ -1,154 +1,23 @@ package api import ( - "encoding/json" - "strings" - "sync" "testing" -) - -func TestSSEBroadcaster_PublishToSubscribers(t *testing.T) { - b := NewSSEBroadcaster() - ch1, unsub1 := b.Subscribe() - defer unsub1() - ch2, unsub2 := b.Subscribe() - defer unsub2() - - want := SSEFrame{Type: "mode_changed", Payload: map[string]string{"mode": "reflector"}} - const expectedSubs = 2 - delivered := b.Publish(want) - if delivered != expectedSubs { - t.Errorf("expected delivery to %d subscribers, got %d", expectedSubs, delivered) - } - - for i, ch := range []<-chan SSEFrame{ch1, ch2} { - got, ok := <-ch - if !ok { - t.Errorf("subscriber %d: channel closed unexpectedly", i) - continue - } - if got.Type != "mode_changed" { - t.Errorf("subscriber %d: type = %q, want mode_changed", i, got.Type) - } - } -} - -func TestSSEBroadcaster_UnsubscribeStopsDelivery(t *testing.T) { - b := NewSSEBroadcaster() - ch, unsub := b.Subscribe() - - unsub() - // After unsubscribe, the channel is closed. - _, open := <-ch - if open { - t.Error("expected channel closed after unsubscribe") - } - - // Publishing after unsubscribe should reach zero subscribers. - delivered := b.Publish(SSEFrame{Type: "test_progress"}) - if delivered != 0 { - t.Errorf("expected 0 deliveries after unsubscribe, got %d", delivered) - } -} - -func TestSSEBroadcaster_SlowSubscriberDropped(t *testing.T) { - b := NewSSEBroadcaster() - ch, unsub := b.Subscribe() - defer unsub() - // Fill the subscriber's buffer beyond capacity so the next publish - // finds it stalled and drops it. - for i := range sseSubscriberBufferSize + 2 { - b.Publish(SSEFrame{Type: "test", Payload: i}) - } - - // Drain. Channel should close eventually because subscriber was evicted. - const maxAttempts = 100 - for attempt := range maxAttempts { - select { - case _, open := <-ch: - if !open { - // Channel closed — eviction happened, as expected. - return - } - default: - if attempt == maxAttempts-1 { - t.Fatal("expected channel close after slow-subscriber eviction") - } - } - } -} - -func TestSSEBroadcaster_ConcurrentSubscribePublish(_ *testing.T) { - // Race-detector smoke test: many goroutines subscribing and - // publishing in parallel must not race or panic. Subscribers - // drain on a best-effort basis — we don't assert delivery counts, - // only that the operations complete cleanly under -race. - b := NewSSEBroadcaster() - var wg sync.WaitGroup - - // Publishers: 30 frames, fire-and-forget. - const ( - publishers = 30 - subscribers = 10 - maxDrainPerCh = 3 - ) - for i := range publishers { - wg.Go(func() { - b.Publish(SSEFrame{Type: "burst", Payload: i}) - }) - } - - // Subscribers: come and go, drain whatever arrives in their lifetime. - for range subscribers { - wg.Go(func() { - ch, unsub := b.Subscribe() - // Drain in a non-blocking loop so we can exit promptly even - // when no frame arrives. - drained := 0 - drain: - for drained < maxDrainPerCh { - select { - case _, ok := <-ch: - if !ok { - break drain - } - drained++ - default: - // No frame ready; bail out — this is a smoke test, - // not a delivery-count assertion. - break drain - } - } - unsub() - }) - } - - wg.Wait() -} - -func TestSSEFrame_Encode(t *testing.T) { - frame := SSEFrame{Type: "mode_changed", Payload: map[string]string{"mode": "reflector"}} - encoded, err := frame.Encode() - if err != nil { - t.Fatalf("encode failed: %v", err) - } - - s := string(encoded) - if !strings.HasPrefix(s, "data: ") { - t.Errorf("expected `data: ` prefix, got %q", s[:20]) - } - if !strings.HasSuffix(s, "\n\n") { - t.Errorf("expected trailing blank line (\\n\\n), got tail %q", s[len(s)-4:]) - } + "github.com/MustardSeedNetworks/stem/internal/api/sse" +) - // The body between "data: " and "\n\n" is valid JSON. - body := strings.TrimSuffix(strings.TrimPrefix(s, "data: "), "\n\n") - var got SSEFrame - if jsonErr := json.Unmarshal([]byte(body), &got); jsonErr != nil { - t.Errorf("encoded body is not valid JSON: %v", jsonErr) - } - if got.Type != "mode_changed" { - t.Errorf("decoded type = %q, want mode_changed", got.Type) - } +// TestSSETransport_BroadcasterWiring verifies the transport-layer integration +// points: the Server struct holds an *sse.Broadcaster, and PublishTestProgress +// does not panic when called with no subscribers. +// +// Comprehensive broadcaster behaviour tests live in [internal/api/sse]. +func TestSSETransport_BroadcasterWiring(_ *testing.T) { + // Construct a minimal Server with only the broadcaster initialised so + // we can exercise the wiring without needing the full NewServer auth + // environment variables. + s := &Server{sseBroadcaster: sse.New()} + + // Publishing with no subscribers must not panic. + s.sseBroadcaster.Publish(sse.Frame{Type: "smoke", Payload: nil}) + s.PublishTestProgress("tid-1", map[string]string{"step": "smoke"}) } diff --git a/internal/api/sse_publishers.go b/internal/api/sse_publishers.go index 2643ec0..715b95f 100644 --- a/internal/api/sse_publishers.go +++ b/internal/api/sse_publishers.go @@ -1,14 +1,17 @@ +// SPDX-License-Identifier: BUSL-1.1 + package api import ( "context" "time" + "github.com/MustardSeedNetworks/stem/internal/api/sse" "github.com/MustardSeedNetworks/stem/internal/logging" ) // reflectorStatsInterval is how often the reflector-stats publisher -// computes and broadcasts a stats frame. 1Hz matches the polling +// computes and broadcasts a stats frame. 1 Hz matches the polling // cadence the UI was using before this PR; the SSE channel just // removes the round-trip overhead and tightens the latency. const reflectorStatsInterval = time.Second @@ -55,7 +58,7 @@ func (s *Server) broadcastReflectorStatsIfActive() { } stats := s.buildActiveReflectorStats(exec, elapsed) - s.sseBroadcaster.Publish(SSEFrame{ + s.sseBroadcaster.Publish(sse.Frame{ Type: "reflector_stats", Payload: stats, }) @@ -74,7 +77,7 @@ func (s *Server) PublishTestProgress(testID string, progress any) { if s.sseBroadcaster == nil { return } - s.sseBroadcaster.Publish(SSEFrame{ + s.sseBroadcaster.Publish(sse.Frame{ Type: "test_progress", Payload: map[string]any{ "testId": testID,