Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions distribution/control_broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package distribution

import (
"context"
"sync"
)

// viewerControlBuffer is the per-subscriber channel buffer size for control
// messages. Matches viewerCaptionBuffer since control messages are similarly
// low-frequency.
const viewerControlBuffer = 10

// ControlBroadcaster fans out messages from a single source channel to
// multiple subscriber channels. Each subscriber gets its own buffered
// channel; slow subscribers have messages dropped (non-blocking send)
// rather than blocking other subscribers or the source.
type ControlBroadcaster struct {
mu sync.RWMutex
subscribers map[string]chan []byte
}

// NewControlBroadcaster creates a ControlBroadcaster ready for use.
func NewControlBroadcaster() *ControlBroadcaster {
return &ControlBroadcaster{
subscribers: make(map[string]chan []byte),
}
}

// Subscribe creates a per-subscriber buffered channel and returns it.
// The caller must call Unsubscribe when done. If a channel already
// exists for the given id, it is closed and replaced.
func (b *ControlBroadcaster) Subscribe(id string) <-chan []byte {
ch := make(chan []byte, viewerControlBuffer)
b.mu.Lock()
if old, ok := b.subscribers[id]; ok {
close(old)
}
b.subscribers[id] = ch
b.mu.Unlock()
return ch
}

// Unsubscribe removes and closes the subscriber's channel. It is safe
// to call multiple times for the same id.
func (b *ControlBroadcaster) Unsubscribe(id string) {
b.mu.Lock()
ch, ok := b.subscribers[id]
if ok {
delete(b.subscribers, id)
close(ch)
}
b.mu.Unlock()
}

// Run reads from the source channel and fans out each message to all
// subscribers. It blocks until ctx is cancelled or the source channel
// is closed. Non-blocking sends: if a subscriber's channel is full,
// the message is dropped for that subscriber (matching the Viewer
// drop pattern).
func (b *ControlBroadcaster) Run(ctx context.Context, source <-chan []byte) {
for {
select {
case <-ctx.Done():
b.closeAll()
return
case data, ok := <-source:
if !ok {
b.closeAll()
return
}
b.mu.RLock()
for _, ch := range b.subscribers {
select {
case ch <- data:
default:
// subscriber is slow; drop message
}
}
b.mu.RUnlock()
}
}
}

// closeAll closes and removes all subscriber channels.
func (b *ControlBroadcaster) closeAll() {
b.mu.Lock()
for id, ch := range b.subscribers {
close(ch)
delete(b.subscribers, id)
}
b.mu.Unlock()
}
146 changes: 146 additions & 0 deletions distribution/control_broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package distribution

import (
"context"
"testing"
"time"
)

func TestControlBroadcasterFanOut(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte, 10)
go b.Run(context.Background(), source)

ch1 := b.Subscribe("s1")
ch2 := b.Subscribe("s2")

source <- []byte(`{"state":"live"}`)

// Both subscribers should receive the same message.
for _, tc := range []struct {
name string
ch <-chan []byte
}{
{"s1", ch1},
{"s2", ch2},
} {
select {
case data := <-tc.ch:
if string(data) != `{"state":"live"}` {
t.Fatalf("%s got %q", tc.name, data)
}
case <-time.After(time.Second):
t.Fatalf("%s timeout", tc.name)
}
}
}

func TestControlBroadcasterUnsubscribe(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte, 10)
go b.Run(context.Background(), source)

ch1 := b.Subscribe("s1")
_ = b.Subscribe("s2")

b.Unsubscribe("s2")

source <- []byte(`{"state":"off"}`)

// s1 should still receive.
select {
case <-ch1:
case <-time.After(time.Second):
t.Fatal("s1 timeout after s2 unsubscribe")
}
}

func TestControlBroadcasterUnsubscribeIdempotent(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte, 10)
go b.Run(context.Background(), source)

b.Subscribe("s1")
b.Unsubscribe("s1")
b.Unsubscribe("s1") // second call must not panic
}

func TestControlBroadcasterSourceClose(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte)
go b.Run(context.Background(), source)

ch := b.Subscribe("s1")
close(source)

// Subscriber channel should be closed.
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel to be closed")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for channel close")
}
}

func TestControlBroadcasterContextCancel(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte) // unbuffered, never closed

ctx, cancel := context.WithCancel(context.Background())
go b.Run(ctx, source)

ch := b.Subscribe("s1")
cancel()

// Subscriber channel should be closed when context is cancelled.
select {
case _, ok := <-ch:
if ok {
t.Fatal("expected channel to be closed")
}
case <-time.After(time.Second):
t.Fatal("timeout waiting for channel close after ctx cancel")
}
}

func TestControlBroadcasterDropOnFull(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte, 100)
go b.Run(context.Background(), source)

ch := b.Subscribe("s1")

// Send more messages than the buffer can hold.
for i := 0; i < viewerControlBuffer+5; i++ {
source <- []byte(`{"i":"data"}`)
}

// Allow broadcaster goroutine to process.
time.Sleep(50 * time.Millisecond)

// Should have exactly viewerControlBuffer messages (extras dropped).
if len(ch) != viewerControlBuffer {
t.Fatalf("channel length = %d, want %d", len(ch), viewerControlBuffer)
}
}

func TestControlBroadcasterNoSubscribers(t *testing.T) {
t.Parallel()
b := NewControlBroadcaster()
source := make(chan []byte, 10)
go b.Run(context.Background(), source)

// Send with no subscribers — should not block or panic.
source <- []byte(`{"empty":"room"}`)

// Allow processing, then close cleanly.
close(source)
}
14 changes: 13 additions & 1 deletion distribution/moq_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type moqSelectionParams struct {
}

// buildMoQCatalog assembles the catalog JSON for a stream.
func buildMoQCatalog(streamKey string, relay *Relay) ([]byte, error) {
// If controlEnabled is true, a "control" track is included in the catalog
// for application-level state broadcast (e.g., switcher control room state).
func buildMoQCatalog(streamKey string, relay *Relay, controlEnabled bool) ([]byte, error) {
vi := relay.VideoInfo()
ai := relay.AudioInfo()

Expand Down Expand Up @@ -98,6 +100,16 @@ func buildMoQCatalog(streamKey string, relay *Relay) ([]byte, error) {
},
})

// Control track (application-level state broadcast as JSON)
if controlEnabled {
catalog.Tracks = append(catalog.Tracks, moqCatalogTrack{
Name: "control",
SelectionParams: moqSelectionParams{
Codec: "application/json",
},
})
}

return json.Marshal(catalog)
}

Expand Down
50 changes: 45 additions & 5 deletions distribution/moq_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
func TestBuildMoQCatalogBasic(t *testing.T) {
t.Parallel()
relay := NewRelay()
data, err := buildMoQCatalog("teststream", relay)
data, err := buildMoQCatalog("teststream", relay, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestBuildMoQCatalogMultiAudio(t *testing.T) {
relay := NewRelay()
relay.SetAudioTrackCount(3)

data, err := buildMoQCatalog("multi", relay)
data, err := buildMoQCatalog("multi", relay, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func TestBuildMoQCatalogCustomVideoInfo(t *testing.T) {
relay.videoInfoSet = true
relay.mu.Unlock()

data, err := buildMoQCatalog("4k", relay)
data, err := buildMoQCatalog("4k", relay, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -136,7 +136,7 @@ func TestBuildMoQCatalogCustomVideoInfo(t *testing.T) {
func TestBuildMoQCatalogJSONFieldNames(t *testing.T) {
t.Parallel()
relay := NewRelay()
data, err := buildMoQCatalog("test", relay)
data, err := buildMoQCatalog("test", relay, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestBuildMoQCatalogCustomAudioInfo(t *testing.T) {
relay := NewRelay()
relay.SetAudioInfo(AudioInfo{Codec: "mp4a.40.05", SampleRate: 44100, Channels: 1})

data, err := buildMoQCatalog("custom-audio", relay)
data, err := buildMoQCatalog("custom-audio", relay, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -189,3 +189,43 @@ func TestBuildMoQCatalogCustomAudioInfo(t *testing.T) {
t.Fatalf("audio channelConfig = %q", ap.ChannelConfig)
}
}

func TestBuildMoQCatalogControlTrack(t *testing.T) {
t.Parallel()
relay := NewRelay()

// Without control enabled: 4 tracks (video + audio0 + captions + stats)
dataNoControl, err := buildMoQCatalog("test", relay, false)
if err != nil {
t.Fatal(err)
}
var catNoControl moqCatalog
if err := json.Unmarshal(dataNoControl, &catNoControl); err != nil {
t.Fatal(err)
}
if len(catNoControl.Tracks) != 4 {
t.Fatalf("without control: track count = %d, want 4", len(catNoControl.Tracks))
}

// With control enabled: 5 tracks (video + audio0 + captions + stats + control)
dataWithControl, err := buildMoQCatalog("test", relay, true)
if err != nil {
t.Fatal(err)
}
var catWithControl moqCatalog
if err := json.Unmarshal(dataWithControl, &catWithControl); err != nil {
t.Fatal(err)
}
if len(catWithControl.Tracks) != 5 {
t.Fatalf("with control: track count = %d, want 5", len(catWithControl.Tracks))
}

// Verify the control track is last and has the right codec
controlTrack := catWithControl.Tracks[4]
if controlTrack.Name != "control" {
t.Fatalf("control track name = %q, want %q", controlTrack.Name, "control")
}
if controlTrack.SelectionParams.Codec != "application/json" {
t.Fatalf("control track codec = %q, want %q", controlTrack.SelectionParams.Codec, "application/json")
}
}
Loading