Skip to content

Commit 0fa554d

Browse files
committed
Replace OnSnapshot callback with Emitter interface
- Add Emitter interface to screentracker package - Remove OnSnapshot from PTYConversationConfig, accept Emitter in NewPTY - Rename EventEmitter methods: EmitMessages, EmitStatus, EmitScreen - Accept agentType at NewEventEmitter construction instead of per-call - Update server.go wiring, all tests pass
1 parent c171c14 commit 0fa554d

6 files changed

Lines changed: 66 additions & 51 deletions

File tree

lib/httpapi/events.go

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,33 @@ func convertStatus(status st.ConversationStatus) AgentStatus {
8181
}
8282
}
8383

84-
// subscriptionBufSize is the size of the buffer for each subscription.
85-
// Once the buffer is full, the channel will be closed.
86-
// Listeners must actively drain the channel, so it's important to
87-
// set this to a value that is large enough to handle the expected
88-
// number of events.
89-
func NewEventEmitter(subscriptionBufSize int) *EventEmitter {
90-
return &EventEmitter{
91-
mu: sync.Mutex{},
84+
const defaultSubscriptionBufSize = 1024
85+
86+
type EventEmitterOption func(*EventEmitter)
87+
88+
func WithSubscriptionBufSize(size int) EventEmitterOption {
89+
return func(e *EventEmitter) {
90+
e.subscriptionBufSize = size
91+
}
92+
}
93+
94+
func WithAgentType(agentType mf.AgentType) EventEmitterOption {
95+
return func(e *EventEmitter) {
96+
e.agentType = agentType
97+
}
98+
}
99+
100+
func NewEventEmitter(opts ...EventEmitterOption) *EventEmitter {
101+
e := &EventEmitter{
92102
messages: make([]st.ConversationMessage, 0),
93103
status: AgentStatusRunning,
94104
chans: make(map[int]chan Event),
95-
chanIdx: 0,
96-
subscriptionBufSize: subscriptionBufSize,
105+
subscriptionBufSize: defaultSubscriptionBufSize,
106+
}
107+
for _, opt := range opts {
108+
opt(e)
97109
}
110+
return e
98111
}
99112

100113
// Assumes the caller holds the lock.
@@ -122,7 +135,7 @@ func (e *EventEmitter) notifyChannels(eventType EventType, payload any) {
122135

123136
// Assumes that only the last message can change or new messages can be added.
124137
// If a new message is injected between existing messages (identified by Id), the behavior is undefined.
125-
func (e *EventEmitter) UpdateMessagesAndEmitChanges(newMessages []st.ConversationMessage) {
138+
func (e *EventEmitter) EmitMessages(newMessages []st.ConversationMessage) {
126139
e.mu.Lock()
127140
defer e.mu.Unlock()
128141

@@ -149,7 +162,7 @@ func (e *EventEmitter) UpdateMessagesAndEmitChanges(newMessages []st.Conversatio
149162
e.messages = newMessages
150163
}
151164

152-
func (e *EventEmitter) UpdateStatusAndEmitChanges(newStatus st.ConversationStatus, agentType mf.AgentType) {
165+
func (e *EventEmitter) EmitStatus(newStatus st.ConversationStatus) {
153166
e.mu.Lock()
154167
defer e.mu.Unlock()
155168

@@ -158,12 +171,11 @@ func (e *EventEmitter) UpdateStatusAndEmitChanges(newStatus st.ConversationStatu
158171
return
159172
}
160173

161-
e.notifyChannels(EventTypeStatusChange, StatusChangeBody{Status: newAgentStatus, AgentType: agentType})
174+
e.notifyChannels(EventTypeStatusChange, StatusChangeBody{Status: newAgentStatus, AgentType: e.agentType})
162175
e.status = newAgentStatus
163-
e.agentType = agentType
164176
}
165177

166-
func (e *EventEmitter) UpdateScreenAndEmitChanges(newScreen string) {
178+
func (e *EventEmitter) EmitScreen(newScreen string) {
167179
e.mu.Lock()
168180
defer e.mu.Unlock()
169181

lib/httpapi/events_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ import (
55
"testing"
66
"time"
77

8-
mf "github.com/coder/agentapi/lib/msgfmt"
98
st "github.com/coder/agentapi/lib/screentracker"
109
"github.com/stretchr/testify/assert"
1110
)
1211

1312
func TestEventEmitter(t *testing.T) {
1413
t.Run("single-subscription", func(t *testing.T) {
15-
emitter := NewEventEmitter(10)
14+
emitter := NewEventEmitter(WithSubscriptionBufSize(10))
1615
_, ch, stateEvents := emitter.Subscribe()
1716
assert.Empty(t, ch)
1817
assert.Equal(t, []Event{
@@ -27,7 +26,7 @@ func TestEventEmitter(t *testing.T) {
2726
}, stateEvents)
2827

2928
now := time.Now()
30-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
29+
emitter.EmitMessages([]st.ConversationMessage{
3130
{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
3231
})
3332
newEvent := <-ch
@@ -36,7 +35,7 @@ func TestEventEmitter(t *testing.T) {
3635
Payload: MessageUpdateBody{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
3736
}, newEvent)
3837

39-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
38+
emitter.EmitMessages([]st.ConversationMessage{
4039
{Id: 1, Message: "Hello, world! (updated)", Role: st.ConversationRoleUser, Time: now},
4140
{Id: 2, Message: "What's up?", Role: st.ConversationRoleAgent, Time: now},
4241
})
@@ -52,24 +51,24 @@ func TestEventEmitter(t *testing.T) {
5251
Payload: MessageUpdateBody{Id: 2, Message: "What's up?", Role: st.ConversationRoleAgent, Time: now},
5352
}, newEvent)
5453

55-
emitter.UpdateStatusAndEmitChanges(st.ConversationStatusStable, mf.AgentTypeAider)
54+
emitter.EmitStatus(st.ConversationStatusStable)
5655
newEvent = <-ch
5756
assert.Equal(t, Event{
5857
Type: EventTypeStatusChange,
59-
Payload: StatusChangeBody{Status: AgentStatusStable, AgentType: mf.AgentTypeAider},
58+
Payload: StatusChangeBody{Status: AgentStatusStable, AgentType: ""},
6059
}, newEvent)
6160
})
6261

6362
t.Run("multiple-subscriptions", func(t *testing.T) {
64-
emitter := NewEventEmitter(10)
63+
emitter := NewEventEmitter(WithSubscriptionBufSize(10))
6564
channels := make([]<-chan Event, 0, 10)
6665
for i := 0; i < 10; i++ {
6766
_, ch, _ := emitter.Subscribe()
6867
channels = append(channels, ch)
6968
}
7069
now := time.Now()
7170

72-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
71+
emitter.EmitMessages([]st.ConversationMessage{
7372
{Id: 1, Message: "Hello, world!", Role: st.ConversationRoleUser, Time: now},
7473
})
7574
for _, ch := range channels {
@@ -82,10 +81,10 @@ func TestEventEmitter(t *testing.T) {
8281
})
8382

8483
t.Run("close-channel", func(t *testing.T) {
85-
emitter := NewEventEmitter(1)
84+
emitter := NewEventEmitter(WithSubscriptionBufSize(1))
8685
_, ch, _ := emitter.Subscribe()
8786
for i := range 5 {
88-
emitter.UpdateMessagesAndEmitChanges([]st.ConversationMessage{
87+
emitter.EmitMessages([]st.ConversationMessage{
8988
{Id: i, Message: fmt.Sprintf("Hello, world! %d", i), Role: st.ConversationRoleUser, Time: time.Now()},
9089
})
9190
}

lib/httpapi/server.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
244244
return mf.FormatToolCall(config.AgentType, message)
245245
}
246246

247-
emitter := NewEventEmitter(1024)
247+
emitter := NewEventEmitter(WithAgentType(config.AgentType))
248248

249249
// Format initial prompt into message parts if provided
250250
var initialPrompt []st.MessagePart
@@ -262,16 +262,8 @@ func NewServer(ctx context.Context, config ServerConfig) (*Server, error) {
262262
ReadyForInitialPrompt: isAgentReadyForInitialPrompt,
263263
FormatToolCall: formatToolCall,
264264
InitialPrompt: initialPrompt,
265-
// OnSnapshot uses a callback rather than passing the emitter directly
266-
// to keep the screentracker package decoupled from httpapi concerns.
267-
// This preserves clean package boundaries and avoids import cycles.
268-
OnSnapshot: func(status st.ConversationStatus, messages []st.ConversationMessage, screen string) {
269-
emitter.UpdateStatusAndEmitChanges(status, config.AgentType)
270-
emitter.UpdateMessagesAndEmitChanges(messages)
271-
emitter.UpdateScreenAndEmitChanges(screen)
272-
},
273-
Logger: logger,
274-
})
265+
Logger: logger,
266+
}, emitter)
275267

276268
// Create temporary directory for uploads
277269
tempDir, err := os.MkdirTemp("", "agentapi-uploads-")

lib/screentracker/conversation.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ type Conversation interface {
6565
Text() string
6666
}
6767

68+
// Emitter receives conversation state updates.
69+
type Emitter interface {
70+
EmitMessages([]ConversationMessage)
71+
EmitStatus(ConversationStatus)
72+
EmitScreen(string)
73+
}
74+
6875
type ConversationMessage struct {
6976
Id int
7077
Message string

lib/screentracker/pty_conversation.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ type PTYConversationConfig struct {
6868
FormatToolCall func(message string) (string, []string)
6969
// InitialPrompt is the initial prompt to send to the agent once ready
7070
InitialPrompt []MessagePart
71-
// OnSnapshot is called after each snapshot with current status, messages, and screen content
72-
OnSnapshot func(status ConversationStatus, messages []ConversationMessage, screen string)
7371
Logger *slog.Logger
7472
}
7573

@@ -86,7 +84,8 @@ func (cfg PTYConversationConfig) getStableSnapshotsThreshold() int {
8684
// PTYConversation is a conversation that uses a pseudo-terminal (PTY) for communication.
8785
// It uses a combination of polling and diffs to detect changes in the screen.
8886
type PTYConversation struct {
89-
cfg PTYConversationConfig
87+
cfg PTYConversationConfig
88+
emitter Emitter
9089
// How many stable snapshots are required to consider the screen stable
9190
stableSnapshotsThreshold int
9291
snapshotBuffer *RingBuffer[screenSnapshot]
@@ -115,13 +114,14 @@ type PTYConversation struct {
115114

116115
var _ Conversation = &PTYConversation{}
117116

118-
func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
117+
func NewPTY(ctx context.Context, cfg PTYConversationConfig, emitter Emitter) *PTYConversation {
119118
if cfg.Clock == nil {
120119
cfg.Clock = quartz.NewReal()
121120
}
122121
threshold := cfg.getStableSnapshotsThreshold()
123122
c := &PTYConversation{
124123
cfg: cfg,
124+
emitter: emitter,
125125
stableSnapshotsThreshold: threshold,
126126
snapshotBuffer: NewRingBuffer[screenSnapshot](threshold),
127127
messages: []ConversationMessage{
@@ -139,9 +139,6 @@ func NewPTY(ctx context.Context, cfg PTYConversationConfig) *PTYConversation {
139139
if len(cfg.InitialPrompt) > 0 {
140140
c.outboundQueue <- outboundMessage{parts: cfg.InitialPrompt, errCh: nil}
141141
}
142-
if c.cfg.OnSnapshot == nil {
143-
c.cfg.OnSnapshot = func(ConversationStatus, []ConversationMessage, string) {}
144-
}
145142
if c.cfg.ReadyForInitialPrompt == nil {
146143
c.cfg.ReadyForInitialPrompt = func(string) bool { return true }
147144
}
@@ -173,7 +170,9 @@ func (c *PTYConversation) Start(ctx context.Context) {
173170
}
174171
c.lock.Unlock()
175172

176-
c.cfg.OnSnapshot(status, messages, screen)
173+
c.emitter.EmitStatus(status)
174+
c.emitter.EmitMessages(messages)
175+
c.emitter.EmitScreen(screen)
177176
return nil
178177
}, "snapshot")
179178

lib/screentracker/pty_conversation_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ func (a *testAgent) setScreen(s string) {
4949
a.screen = s
5050
}
5151

52+
type testEmitter struct{}
53+
54+
func (testEmitter) EmitMessages([]st.ConversationMessage) {}
55+
func (testEmitter) EmitStatus(st.ConversationStatus) {}
56+
func (testEmitter) EmitScreen(string) {}
57+
5258
// advanceFor is a shorthand for advanceUntil with a time-based condition.
5359
func advanceFor(ctx context.Context, t *testing.T, mClock *quartz.Mock, total time.Duration) {
5460
t.Helper()
@@ -125,7 +131,7 @@ func statusTest(t *testing.T, params statusTestParams) {
125131
params.cfg.AgentIO = agent
126132
params.cfg.Logger = slog.New(slog.NewTextHandler(io.Discard, nil))
127133

128-
c := st.NewPTY(ctx, params.cfg)
134+
c := st.NewPTY(ctx, params.cfg, &testEmitter{})
129135
c.Start(ctx)
130136

131137
assert.Equal(t, st.ConversationStatusInitializing, c.Status())
@@ -233,7 +239,7 @@ func TestMessages(t *testing.T) {
233239
agent = a
234240
}
235241

236-
c := st.NewPTY(ctx, cfg)
242+
c := st.NewPTY(ctx, cfg, &testEmitter{})
237243
c.Start(ctx)
238244

239245
return c, agent, mClock
@@ -460,7 +466,7 @@ func TestInitialPromptReadiness(t *testing.T) {
460466
Logger: discardLogger,
461467
}
462468

463-
c := st.NewPTY(ctx, cfg)
469+
c := st.NewPTY(ctx, cfg, &testEmitter{})
464470
c.Start(ctx)
465471

466472
// Take a snapshot with "loading...". Threshold is 1 (stability 0 / interval 1s = 0 + 1 = 1).
@@ -488,7 +494,7 @@ func TestInitialPromptReadiness(t *testing.T) {
488494
Logger: discardLogger,
489495
}
490496

491-
c := st.NewPTY(ctx, cfg)
497+
c := st.NewPTY(ctx, cfg, &testEmitter{})
492498
c.Start(ctx)
493499

494500
// Agent not ready initially.
@@ -524,7 +530,7 @@ func TestInitialPromptReadiness(t *testing.T) {
524530
Logger: discardLogger,
525531
}
526532

527-
c := st.NewPTY(ctx, cfg)
533+
c := st.NewPTY(ctx, cfg, &testEmitter{})
528534
c.Start(ctx)
529535

530536
// Status is "changing" while waiting for readiness.
@@ -564,7 +570,7 @@ func TestInitialPromptReadiness(t *testing.T) {
564570
Logger: discardLogger,
565571
}
566572

567-
c := st.NewPTY(ctx, cfg)
573+
c := st.NewPTY(ctx, cfg, &testEmitter{})
568574
c.Start(ctx)
569575

570576
advanceFor(ctx, t, mClock, 1*time.Second)
@@ -586,7 +592,7 @@ func TestInitialPromptReadiness(t *testing.T) {
586592
Logger: discardLogger,
587593
}
588594

589-
c := st.NewPTY(ctx, cfg)
595+
c := st.NewPTY(ctx, cfg, &testEmitter{})
590596
c.Start(ctx)
591597

592598
// Fill buffer to reach stability with "ready" screen.

0 commit comments

Comments
 (0)