feat: Add WebSocket Hub and Client infrastructure#9
Conversation
- Add gorilla/websocket v1.5.3 dependency - Create Hub with client registration, unregistration, and broadcasting - Create Client with readPump/writePump goroutines, ping/pong, panic recovery - Create Message envelope type for structured WebSocket events - Add BroadcastSender interface for decoupled dependency injection - Add Hub.Shutdown() for graceful close of all client connections - Add comprehensive tests (7 tests covering hub lifecycle, broadcast, slow client eviction, interface compliance, and message serialization) Closes #2
There was a problem hiding this comment.
Pull request overview
Adds a new internal/websocket package to provide a Hub/Client abstraction around gorilla/websocket for backend real-time messaging, including a basic message envelope and unit tests.
Changes:
- Introduces
Hubfor client registration/unregistration, broadcast fan-out, and shutdown. - Introduces
Clientwith read/write pump goroutines (ping/pong, batching). - Adds
Messageenvelope helpers + tests, and adds thegorilla/websocketdependency.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/internal/websocket/hub.go | Adds the Hub event loop, broadcast API, and shutdown logic. |
| backend/internal/websocket/client.go | Adds WebSocket client lifecycle + read/write goroutines. |
| backend/internal/websocket/message.go | Adds a typed message envelope with JSON helpers. |
| backend/internal/websocket/hub_test.go | Adds unit tests for hub behavior and message helpers. |
| backend/go.mod | Adds github.com/gorilla/websocket dependency. |
| backend/go.sum | Records checksums for the new dependency. |
You can also share your feedback on Copilot code review. Take the survey.
backend/internal/websocket/client.go
Outdated
| if r := recover(); r != nil { | ||
| slog.Error("Panic in WebSocket readPump", "recover", r) | ||
| } | ||
| c.hub.unregister <- c |
backend/internal/websocket/client.go
Outdated
| conn: conn, | ||
| send: make(chan []byte, sendBufferSize), | ||
| } | ||
| hub.register <- client |
| // Shutdown gracefully stops the hub's Run loop and closes all client connections. | ||
| func (h *Hub) Shutdown() { | ||
| close(h.done) | ||
| } |
| hub := NewHub() | ||
| go hub.Run() | ||
| defer hub.Shutdown() | ||
|
|
||
| clients := make([]*Client, tt.registerCount) | ||
| for i := range tt.registerCount { | ||
| c := &Client{ | ||
| hub: hub, | ||
| send: make(chan []byte, sendBufferSize), | ||
| } | ||
| hub.register <- c | ||
| clients[i] = c | ||
| } | ||
|
|
||
| time.Sleep(50 * time.Millisecond) | ||
| assert.Equal(t, tt.registerCount, hub.ClientCount()) | ||
|
|
||
| if tt.unregisterAll { | ||
| for _, c := range clients { | ||
| hub.unregister <- c | ||
| } | ||
| time.Sleep(50 * time.Millisecond) | ||
| } |
| // NewMessage creates a Message with the given type and payload. | ||
| // The payload is JSON-marshalled; if marshalling fails the payload is set to null. | ||
| func NewMessage(msgType string, payload interface{}) Message { | ||
| data, err := json.Marshal(payload) | ||
| if err != nil { | ||
| data = []byte("null") | ||
| } | ||
| return Message{ | ||
| Type: msgType, | ||
| Payload: data, | ||
| } | ||
| } |
| b, err := json.Marshal(m) | ||
| if err != nil { | ||
| return []byte(`{"type":"error","payload":null}`) | ||
| } | ||
| return b | ||
| } |
- Make Hub.Shutdown() idempotent via sync.Once to prevent double-close panic - Add Hub.Register()/Unregister() methods that select on hub.done to prevent goroutine leaks and deadlocks during shutdown - Update NewClient to use Hub.Register(), returning error on closed hub - Update readPump to use Hub.Unregister() instead of raw channel send - Change NewMessage signature to (Message, error) to surface marshal failures - Change Message.Bytes() signature to ([]byte, error) for explicit error handling - Replace time.Sleep in tests with assert.Eventually polling - Add tests for idempotent shutdown and register-after-shutdown
There was a problem hiding this comment.
Pull request overview
Introduces a new backend/internal/websocket package providing the foundational Hub/Client/message envelope needed to support real-time WebSocket broadcasting from the Go backend.
Changes:
- Added
Hubto manage client registration/unregistration and broadcasting, plus graceful shutdown. - Added
Clientwith read/write pumps (ping/pong, deadlines, write queue handling). - Added
Messageenvelope helpers + unit tests; addedgithub.com/gorilla/websocketdependency.
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| backend/internal/websocket/message.go | Defines a JSON envelope (type + payload) with helpers to build/serialize messages. |
| backend/internal/websocket/hub.go | Implements the Hub event loop, client tracking, broadcast fan-out, and shutdown behavior. |
| backend/internal/websocket/client.go | Implements per-connection goroutines for reading/writing, ping/pong, and lifecycle cleanup. |
| backend/internal/websocket/hub_test.go | Adds unit tests for hub behaviors and message helpers. |
| backend/go.mod | Adds github.com/gorilla/websocket v1.5.3. |
| backend/go.sum | Adds checksums for the gorilla/websocket dependency. |
You can also share your feedback on Copilot code review. Take the survey.
| hub.register <- c1 | ||
| hub.register <- c2 | ||
| time.Sleep(50 * time.Millisecond) | ||
|
|
| hub.register <- c | ||
| time.Sleep(50 * time.Millisecond) | ||
| assert.Equal(t, 1, hub.ClientCount()) | ||
|
|
||
| hub.Shutdown() | ||
| time.Sleep(50 * time.Millisecond) | ||
|
|
||
| assert.Equal(t, 0, hub.ClientCount()) |
backend/internal/websocket/client.go
Outdated
| // Drain queued messages into the same write frame for efficiency. | ||
| n := len(c.send) | ||
| for i := 0; i < n; i++ { | ||
| if _, err := w.Write([]byte("\n")); err != nil { | ||
| break | ||
| } | ||
| if _, err := w.Write(<-c.send); err != nil { | ||
| break | ||
| } | ||
| } | ||
|
|
| defer hub.Shutdown() | ||
|
|
||
| clients := make([]*Client, tt.registerCount) | ||
| for i := range tt.registerCount { |
| hub.register <- c | ||
| clients[i] = c | ||
| } | ||
|
|
||
| time.Sleep(50 * time.Millisecond) | ||
| assert.Equal(t, tt.registerCount, hub.ClientCount()) |
- Extract broadcastBufferSize constant in hub.go - Fix broadcast loop: collect slow clients under RLock, remove under Lock - Reorder handleSend to check channel close before SetWriteDeadline - Extract writePump helpers: handleSend, writePing, writeMessage - Send each queued message as a separate WS frame (no newline batching) - Use sync.Once for idempotent Shutdown, Register/Unregister with done select - NewMessage/Bytes return errors instead of silently failing - Replace time.Sleep with assert.Eventually in all tests - Fix TestHub_BroadcastChannelFull race: test Broadcast() without Run() - Add agent definition files for multi-agent workflow
Summary
Adds the core WebSocket infrastructure to the backend using
gorilla/websocket.Changes
internal/websocket/hub.go—Hubstruct with client registration/unregistration/broadcasting via channels andsync.RWMutex.BroadcastSenderinterface for decoupled dependency injection.Shutdown()for graceful close.internal/websocket/client.go—Clientstruct withreadPump()/writePump()goroutines handling ping/pong, dead connection detection, write batching, and panic recovery.internal/websocket/message.go—Messageenvelope type (type+payloadJSON),NewMessage()factory,Bytes()serializer.internal/websocket/hub_test.go— 7 tests: register/unregister (table-driven), broadcast to multiple clients, shutdown closes all, slow client eviction,BroadcastSenderinterface compliance,NewMessage(table-driven),Message.Bytesround-trip.go.mod/go.sum— Addedgithub.com/gorilla/websocket v1.5.3\- **go.mod/go.sum** — Addedgithub.com/gorilla/websocket v1.5.3- **luding the newwebsocketpackage. Zerogo vetwarnings.Closes #2