Complete guide to using Server-Sent Events with github.com/coregx/stream/sse.
- What is SSE?
- When to Use SSE
- Quick Start
- Event Structure
- Connection Lifecycle
- Broadcasting with Hub
- Best Practices
- Error Handling
- Production Considerations
- Advanced Patterns
- Comparison with WebSocket
Server-Sent Events (SSE) is a standard for servers to push real-time updates to clients over HTTP. It's part of the HTML5 specification and uses the text/event-stream content type.
- Unidirectional: Server → Client only
- Text-based: UTF-8 encoded event stream
- HTTP-based: Works over standard HTTP/HTTPS
- Auto-reconnect: Browsers automatically reconnect on disconnect
- EventSource API: Native browser support (no libraries needed)
event: eventType
id: uniqueId
retry: 3000
data: line 1
data: line 2
Each event ends with a blank line (\n\n).
- Live notifications (new message, alert, status update)
- Real-time dashboards (metrics, analytics, monitoring)
- Activity streams (social feeds, logs, events)
- Live scores/tickers (sports, stocks, news)
- Progress updates (file upload, job status)
- Server logs streaming (tail -f over HTTP)
- Bidirectional communication (use WebSocket)
- Binary data (use WebSocket or chunked transfer)
- High-frequency updates (>10 events/sec → WebSocket)
- Client → Server messaging (POST or WebSocket)
| Feature | SSE | WebSocket |
|---|---|---|
| Direction | Server → Client | Bidirectional |
| Protocol | HTTP | TCP (ws://) |
| Data | Text (UTF-8) | Binary + Text |
| Browser API | EventSource | WebSocket |
| Reconnect | Automatic | Manual |
| Overhead | Lower | Higher |
| Complexity | Simpler | More complex |
Rule of thumb: Use SSE for server-push notifications, WebSocket for real-time collaboration.
package main
import (
"net/http"
"time"
"github.com/coregx/stream/sse"
)
func main() {
http.HandleFunc("/events", handleSSE)
http.ListenAndServe(":8080", nil)
}
func handleSSE(w http.ResponseWriter, r *http.Request) {
// Upgrade to SSE
conn, err := sse.Upgrade(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
// Send events
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
conn.SendData("ping")
case <-conn.Done():
return
}
}
}curl http://localhost:8080/eventsOutput:
: connected
data: ping
data: ping
data: ping
...
const eventSource = new EventSource('http://localhost:8080/events');
eventSource.onmessage = (e) => {
console.log('Received:', e.data);
};
eventSource.onerror = (err) => {
console.error('Error:', err);
};The Event type represents a Server-Sent Event:
type Event struct {
Type string // Event type (optional)
ID string // Event ID (optional)
Data string // Event data (required)
Retry int // Reconnect time in ms (optional)
}event := sse.NewEvent("Hello, World!").
WithType("greeting").
WithID("msg-1").
WithRetry(3000)
// Serializes to:
// event: greeting
// id: msg-1
// retry: 3000
// data: Hello, World!
//The message payload. Can be multi-line:
event := sse.NewEvent("line1\nline2\nline3")
// Becomes:
// data: line1
// data: line2
// data: line3
//Event type for client-side filtering:
event := sse.NewEvent("New order!").WithType("order")JavaScript:
eventSource.addEventListener('order', (e) => {
console.log('Order event:', e.data);
});Unique event ID for reconnection tracking:
event := sse.NewEvent("data").WithID("evt-123")On reconnect, client sends Last-Event-ID: evt-123 header.
Milliseconds to wait before reconnecting:
event := sse.NewEvent("data").WithRetry(5000) // 5 secondsComments keep the connection alive:
comment := sse.Comment("keep-alive")
// Output: : keep-alive\n\nClients ignore comments, but they prevent timeouts.
The Upgrade() function converts an HTTP response to SSE:
conn, err := sse.Upgrade(w, r)
if err != nil {
// ResponseWriter doesn't support flushing
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()What Upgrade does:
- Checks
http.Flushersupport - Sets SSE headers (
Content-Type,Cache-Control, etc.) - Sends initial
: connectedcomment - Creates
Connwith context
Three ways to send events:
Full control with Event builder:
event := sse.NewEvent("Order #123 shipped").
WithType("notification").
WithID("notif-456")
conn.Send(event)Simple data-only event:
conn.SendData("Hello, World!")Equivalent to:
conn.Send(sse.NewEvent("Hello, World!"))JSON-encode and send:
user := map[string]string{"name": "Alice", "status": "online"}
conn.SendJSON(user)Sends:
data: {"name":"Alice","status":"online"}
Connections respect context cancellation:
// Use request context (default)
conn, err := sse.Upgrade(w, r)
// Or custom context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
conn, err := sse.UpgradeWithContext(ctx, w, r)When context is canceled, conn.Done() closes.
Always close connections:
defer conn.Close()Close() is idempotent (safe to call multiple times).
Wait for disconnection:
<-conn.Done()Example with select:
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := conn.SendData("ping"); err != nil {
return
}
case <-conn.Done():
log.Println("Client disconnected")
return
}
}Hub[T] manages broadcasting events to multiple clients.
hub := sse.NewHub[string]()
go hub.Run()
defer hub.Close()Generic type parameter T:
Hub[string]- broadcast stringsHub[Message]- broadcast custom typesHub[any]- broadcast any type
// Create
hub := sse.NewHub[Message]()
// Start (in goroutine)
go hub.Run()
// Use
hub.Register(conn)
hub.Broadcast(msg)
// Cleanup
defer hub.Close()conn, err := sse.Upgrade(w, r)
if err != nil {
return
}
defer conn.Close()
// Register with hub
hub.Register(conn)
defer hub.Unregister(conn)
// Wait for disconnection
<-conn.Done()// String hub
hub := sse.NewHub[string]()
hub.Broadcast("Hello, everyone!")
// Custom type hub
type Message struct {
User string
Text string
}
hub := sse.NewHub[Message]()
hub.Broadcast(Message{User: "Alice", Text: "Hi!"})Custom types must implement fmt.Stringer OR be JSON-serializable:
type Message struct {
User string `json:"user"`
Text string `json:"text"`
}
// Option 1: Implement Stringer
func (m Message) String() string {
data, _ := json.Marshal(m)
return string(data)
}
// Option 2: Let Hub JSON-encode (automatic)
hub := sse.NewHub[Message]()
hub.Broadcast(Message{User: "Alice", Text: "Hi"})Track active clients:
count := hub.Clients()
log.Printf("Active clients: %d", count)Hub automatically removes failed clients:
// Client disconnects → removed from hub
// Send fails → client unregistered
// Hub closes → all clients disconnectedconn, err := sse.Upgrade(w, r)
if err != nil {
return
}
defer conn.Close()ctx, cancel := context.WithTimeout(r.Context(), 10*time.Minute)
defer cancel()
conn, err := sse.UpgradeWithContext(ctx, w, r)Prevent proxy/firewall timeouts:
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Send comment
io.WriteString(w, ": keep-alive\n\n")
flusher.Flush()
case <-conn.Done():
return
}
}eventID := 0
for msg := range messages {
eventID++
event := sse.NewEvent(msg).WithID(fmt.Sprintf("evt-%d", eventID))
conn.Send(event)
}
// On reconnect, check Last-Event-ID header
lastID := r.Header.Get("Last-Event-ID")
if lastID != "" {
// Resume from lastID
}ctx, cancel := context.WithTimeout(r.Context(), 1*time.Hour)
defer cancel()
conn, err := sse.UpgradeWithContext(ctx, w, r)const maxClients = 1000
if hub.Clients() >= maxClients {
http.Error(w, "Too many clients", http.StatusServiceUnavailable)
return
}
hub.Register(conn)import "log/slog"
slog.Info("Client connected",
"remote", r.RemoteAddr,
"clients", hub.Clients(),
)// Close hub first (disconnects clients gracefully)
hub.Close()
// Then shutdown HTTP server
server.Shutdown(ctx)conn, err := sse.Upgrade(w, r)
if err != nil {
if errors.Is(err, sse.ErrNoFlusher) {
// ResponseWriter doesn't support flushing
log.Println("Incompatible server or proxy")
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}if err := conn.Send(event); err != nil {
if errors.Is(err, sse.ErrConnectionClosed) {
// Connection already closed
return
}
log.Printf("Send error: %v", err)
return
}if err := hub.Register(conn); err != nil {
if errors.Is(err, sse.ErrHubClosed) {
// Hub is shutting down
http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
return
}
}for {
select {
case msg := <-messages:
if err := conn.SendData(msg); err != nil {
// Client disconnected or send failed
log.Printf("Client error: %v", err)
return
}
case <-conn.Done():
// Clean disconnect
log.Println("Client disconnected")
return
}
}location /events {
proxy_pass http://backend;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
proxy_buffering off;
proxy_cache off;
}<Location /events>
ProxyPass http://backend/events
ProxyPassReverse http://backend/events
SetEnv proxy-sendcl 1
</Location>func handleSSE(w http.ResponseWriter, r *http.Request) {
// CORS headers
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Last-Event-ID")
if r.Method == "OPTIONS" {
return
}
conn, err := sse.Upgrade(w, r)
// ...
}func handleSSE(w http.ResponseWriter, r *http.Request) {
// Validate JWT token
token := r.Header.Get("Authorization")
if !validateToken(token) {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
conn, err := sse.Upgrade(w, r)
// ...
}type RateLimiter struct {
clients map[string]*rate.Limiter
mu sync.Mutex
}
func (rl *RateLimiter) Allow(ip string) bool {
rl.mu.Lock()
defer rl.mu.Unlock()
limiter, ok := rl.clients[ip]
if !ok {
limiter = rate.NewLimiter(10, 20) // 10 req/s, burst 20
rl.clients[ip] = limiter
}
return limiter.Allow()
}import "github.com/prometheus/client_golang/prometheus"
var (
sseConnections = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "sse_connections_active",
Help: "Number of active SSE connections",
})
sseEvents = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sse_events_sent_total",
Help: "Total number of SSE events sent",
})
)
func init() {
prometheus.MustRegister(sseConnections)
prometheus.MustRegister(sseEvents)
}
func handleSSE(w http.ResponseWriter, r *http.Request) {
conn, err := sse.Upgrade(w, r)
if err != nil {
return
}
defer conn.Close()
sseConnections.Inc()
defer sseConnections.Dec()
for event := range events {
conn.Send(event)
sseEvents.Inc()
}
}const maxConnections = 10000
var activeConnections atomic.Int64
func handleSSE(w http.ResponseWriter, r *http.Request) {
if activeConnections.Load() >= maxConnections {
http.Error(w, "Too many connections", http.StatusServiceUnavailable)
return
}
activeConnections.Add(1)
defer activeConnections.Add(-1)
// ...
}type ChatServer struct {
rooms map[string]*sse.Hub[Message]
mu sync.RWMutex
}
func (cs *ChatServer) GetRoom(name string) *sse.Hub[Message] {
cs.mu.Lock()
defer cs.mu.Unlock()
hub, ok := cs.rooms[name]
if !ok {
hub = sse.NewHub[Message]()
go hub.Run()
cs.rooms[name] = hub
}
return hub
}
func (cs *ChatServer) handleEvents(w http.ResponseWriter, r *http.Request) {
room := r.URL.Query().Get("room")
if room == "" {
room = "default"
}
hub := cs.GetRoom(room)
conn, _ := sse.Upgrade(w, r)
hub.Register(conn)
defer hub.Unregister(conn)
<-conn.Done()
}type EventLog struct {
events []sse.Event
mu sync.RWMutex
}
func (el *EventLog) Add(event *sse.Event) {
el.mu.Lock()
el.events = append(el.events, *event)
el.mu.Unlock()
}
func (el *EventLog) ReplayFrom(conn *sse.Conn, lastID string) {
el.mu.RLock()
defer el.mu.RUnlock()
found := false
for _, event := range el.events {
if event.ID == lastID {
found = true
continue
}
if found || lastID == "" {
conn.Send(&event)
}
}
}
func handleSSE(w http.ResponseWriter, r *http.Request) {
conn, _ := sse.Upgrade(w, r)
defer conn.Close()
// Replay missed events
lastID := r.Header.Get("Last-Event-ID")
eventLog.ReplayFrom(conn, lastID)
// Continue with live events
// ...
}type FilteredHub struct {
hub *sse.Hub[Message]
filters map[*sse.Conn]func(Message) bool
mu sync.RWMutex
}
func (fh *FilteredHub) RegisterWithFilter(conn *sse.Conn, filter func(Message) bool) {
fh.hub.Register(conn)
fh.mu.Lock()
fh.filters[conn] = filter
fh.mu.Unlock()
}
func (fh *FilteredHub) Broadcast(msg Message) {
fh.mu.RLock()
defer fh.mu.RUnlock()
for conn, filter := range fh.filters {
if filter(msg) {
conn.SendJSON(msg)
}
}
}func handleSSE(w http.ResponseWriter, r *http.Request) {
conn, _ := sse.Upgrade(w, r)
defer conn.Close()
// Buffered channel for backpressure
events := make(chan string, 100)
// Sender goroutine
go func() {
for event := range events {
if err := conn.SendData(event); err != nil {
return
}
}
}()
// Producer
for msg := range messages {
select {
case events <- msg:
// Sent successfully
default:
// Channel full, drop or handle
log.Println("Slow client, dropping message")
}
}
}✅ Use SSE when:
- Server pushes updates to client (one-way)
- Simple text-based events
- Built-in reconnection needed
- HTTP infrastructure (proxies, load balancers)
- Simpler implementation
- EventSource API convenience
✅ Use WebSocket when:
- Bidirectional communication needed
- Binary data transfer
- Very high frequency (>10 events/sec)
- Custom protocol required
- Lower latency critical
- Gaming, video, audio streaming
| Feature | SSE | WebSocket |
|---|---|---|
| Protocol | HTTP | TCP (ws://) |
| Direction | Server→Client | Bidirectional |
| Data Format | Text (UTF-8) | Binary + Text |
| Reconnection | Automatic | Manual |
| Event Types | Built-in | Custom |
| Browser API | EventSource | WebSocket |
| Proxies | Works well | May need config |
| Overhead | Low | Higher |
| Latency | ~100ms | ~10ms |
| Max Connections | ~6 per domain | Unlimited |
See the examples/ directory for complete working examples:
See pkg.go.dev/github.com/coregx/stream/sse for full API documentation.
Last updated: 2025-01-18 Version: v0.1.0-alpha