Skip to content

Commit 8670852

Browse files
intel352claude
andcommitted
feat: add sdk package with reusable daemon-client infrastructure
Extracts the daemon/IPC client pattern from workflow-cardgame into a game-agnostic sdk package so other agent CLI tools can reuse it without copying boilerplate. - sdk/session.go — SessionState, LoadSession, SaveSession - sdk/events.go — EventWriter (append-only JSONL with seq+timestamp) - sdk/daemon.go — DaemonClient (WS connect, session resume, PID file, fan-out) - sdk/ipc.go — IPCServer + CommandClient + SocketPath utility - sdk/util.go — WSURLToHTTP, HTTPGET, IsTerminal - sdk/session_test.go — round-trip + invalid JSON tests (both pass) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 6f77025 commit 8670852

8 files changed

Lines changed: 641 additions & 0 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ require (
9393
github.com/google/s2a-go v0.1.9 // indirect
9494
github.com/googleapis/enterprise-certificate-proxy v0.3.14 // indirect
9595
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
96+
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
9697
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
9798
github.com/hashicorp/errwrap v1.1.0 // indirect
9899
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ
222222
github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY=
223223
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
224224
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
225+
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
226+
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
225227
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
226228
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
227229
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=

sdk/daemon.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
package sdk
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"os"
8+
"sync"
9+
"time"
10+
11+
"github.com/gorilla/websocket"
12+
)
13+
14+
// DaemonConfig configures a DaemonClient.
15+
type DaemonConfig struct {
16+
// WSURL is the WebSocket URL to connect to (e.g. "ws://localhost:8080/ws").
17+
WSURL string
18+
// SessionPath is the path to the session JSON file.
19+
// The daemon also writes a PID file at SessionPath+".pid" and events at
20+
// SessionPath+".events".
21+
SessionPath string
22+
// OnMessage is called for every WebSocket message received, including the
23+
// welcome message. It is called from the dispatcher goroutine; do not block.
24+
OnMessage func(data []byte)
25+
// BroadcastFilter returns true if data is a known server broadcast that
26+
// should never be treated as a direct response to a command. Used by the
27+
// IPC server to skip stale messages while waiting for command responses.
28+
// If nil, no filtering is applied.
29+
BroadcastFilter func(data []byte) bool
30+
}
31+
32+
// ResponseSubscriber receives a copy of every WebSocket message via fan-out.
33+
// Register one with DaemonClient.Subscribe before sending a command to ensure
34+
// no messages are missed between send and receive.
35+
type ResponseSubscriber struct {
36+
ch chan []byte
37+
}
38+
39+
// C returns the channel on which messages are delivered.
40+
func (s *ResponseSubscriber) C() <-chan []byte { return s.ch }
41+
42+
// DaemonClient maintains a persistent WebSocket connection, appends all
43+
// received messages to an events file, and provides fan-out delivery to
44+
// registered subscribers.
45+
type DaemonClient struct {
46+
cfg DaemonConfig
47+
conn *websocket.Conn
48+
connID string
49+
sess SessionState
50+
evw *EventWriter
51+
wsMsgCh chan []byte
52+
wsErrCh chan error
53+
subsMu sync.Mutex
54+
subs []*ResponseSubscriber
55+
}
56+
57+
// NewDaemonClient creates a DaemonClient with the given configuration.
58+
// Call Run to start it.
59+
func NewDaemonClient(cfg DaemonConfig) *DaemonClient {
60+
return &DaemonClient{
61+
cfg: cfg,
62+
wsMsgCh: make(chan []byte, 256),
63+
wsErrCh: make(chan error, 1),
64+
}
65+
}
66+
67+
// Run connects to the WebSocket, performs session resume if a token is stored,
68+
// writes a PID file, opens the events file, then blocks until ctx is cancelled
69+
// or the WebSocket connection is lost.
70+
func (d *DaemonClient) Run(ctx context.Context) error {
71+
conn, _, err := websocket.DefaultDialer.Dial(d.cfg.WSURL, nil)
72+
if err != nil {
73+
return fmt.Errorf("failed to connect to %s: %w", d.cfg.WSURL, err)
74+
}
75+
defer conn.Close()
76+
d.conn = conn
77+
78+
// Read welcome message.
79+
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
80+
_, msg, err := conn.ReadMessage()
81+
conn.SetReadDeadline(time.Time{})
82+
if err != nil {
83+
return fmt.Errorf("failed to read welcome: %w", err)
84+
}
85+
86+
var welcome map[string]any
87+
json.Unmarshal(msg, &welcome) //nolint:errcheck
88+
d.connID, _ = welcome["connectionId"].(string)
89+
90+
// Persist connection ID.
91+
d.sess = LoadSession(d.cfg.SessionPath)
92+
d.sess.LastConnectionID = d.connID
93+
if err := SaveSession(d.cfg.SessionPath, d.sess); err != nil {
94+
return fmt.Errorf("failed to save session: %w", err)
95+
}
96+
97+
// Attempt session resume if we have a stored token.
98+
if d.sess.Token != "" {
99+
resumeCmd, _ := json.Marshal(map[string]any{
100+
"type": "session_resume",
101+
"token": d.sess.Token,
102+
})
103+
if err := conn.WriteMessage(websocket.TextMessage, resumeCmd); err != nil {
104+
fmt.Fprintf(os.Stderr, "warning: failed to send session_resume: %v\n", err)
105+
} else {
106+
conn.SetReadDeadline(time.Now().Add(5 * time.Second))
107+
_, resumeResp, err := conn.ReadMessage()
108+
conn.SetReadDeadline(time.Time{})
109+
if err != nil {
110+
fmt.Fprintf(os.Stderr, "warning: no response to session_resume: %v\n", err)
111+
} else {
112+
var r map[string]any
113+
json.Unmarshal(resumeResp, &r) //nolint:errcheck
114+
if rType, _ := r["type"].(string); rType == "session_resumed" {
115+
fmt.Fprintf(os.Stderr, "session resumed: gameId=%s playerId=%s\n",
116+
r["gameId"], r["playerId"])
117+
} else {
118+
fmt.Fprintf(os.Stderr, "session_resume failed, continuing as new connection\n")
119+
d.sess.Token = ""
120+
SaveSession(d.cfg.SessionPath, d.sess) //nolint:errcheck
121+
}
122+
}
123+
}
124+
}
125+
126+
// Write PID file.
127+
pidPath := d.cfg.SessionPath + ".pid"
128+
if err := os.WriteFile(pidPath, []byte(fmt.Sprintf("%d\n", os.Getpid())), 0o600); err != nil {
129+
return fmt.Errorf("failed to write pid file: %w", err)
130+
}
131+
defer os.Remove(pidPath)
132+
133+
// Open events file.
134+
eventsPath := d.cfg.SessionPath + ".events"
135+
evw, err := NewEventWriter(eventsPath)
136+
if err != nil {
137+
return fmt.Errorf("failed to open events file: %w", err)
138+
}
139+
defer evw.Close()
140+
d.evw = evw
141+
142+
// Welcome message is the first event.
143+
evw.Append(msg)
144+
if d.cfg.OnMessage != nil {
145+
d.cfg.OnMessage(msg)
146+
}
147+
148+
// WS reader goroutine — sole reader of the connection.
149+
go func() {
150+
for {
151+
_, data, err := conn.ReadMessage()
152+
if err != nil {
153+
d.wsErrCh <- err
154+
return
155+
}
156+
d.wsMsgCh <- data
157+
}
158+
}()
159+
160+
// Dispatcher goroutine — appends to events file and fans out.
161+
go func() {
162+
for data := range d.wsMsgCh {
163+
evw.Append(data)
164+
if d.cfg.OnMessage != nil {
165+
d.cfg.OnMessage(data)
166+
}
167+
d.fanOut(data)
168+
}
169+
}()
170+
171+
fmt.Fprintf(os.Stderr, "daemon started: connectionId=%s events=%s\n", d.connID, eventsPath)
172+
173+
// Block until ctx or WS error.
174+
select {
175+
case <-ctx.Done():
176+
fmt.Fprintf(os.Stderr, "daemon shutting down\n")
177+
case wsErr := <-d.wsErrCh:
178+
fmt.Fprintf(os.Stderr, "daemon WebSocket error: %v\n", wsErr)
179+
}
180+
return nil
181+
}
182+
183+
// Subscribe registers a new ResponseSubscriber to receive all subsequent
184+
// WebSocket messages. Unsubscribe when done to avoid leaking goroutines.
185+
func (d *DaemonClient) Subscribe() *ResponseSubscriber {
186+
sub := &ResponseSubscriber{ch: make(chan []byte, 64)}
187+
d.subsMu.Lock()
188+
d.subs = append(d.subs, sub)
189+
d.subsMu.Unlock()
190+
return sub
191+
}
192+
193+
// Unsubscribe removes a subscriber registered with Subscribe.
194+
func (d *DaemonClient) Unsubscribe(sub *ResponseSubscriber) {
195+
d.subsMu.Lock()
196+
for i, s := range d.subs {
197+
if s == sub {
198+
d.subs = append(d.subs[:i], d.subs[i+1:]...)
199+
break
200+
}
201+
}
202+
d.subsMu.Unlock()
203+
}
204+
205+
// SendWS sends raw bytes as a WebSocket text message.
206+
func (d *DaemonClient) SendWS(data []byte) error {
207+
if d.conn == nil {
208+
return fmt.Errorf("not connected")
209+
}
210+
return d.conn.WriteMessage(websocket.TextMessage, data)
211+
}
212+
213+
// WSConn returns the underlying WebSocket connection.
214+
// Do NOT read from it directly — the daemon's reader goroutine is the sole reader.
215+
func (d *DaemonClient) WSConn() *websocket.Conn { return d.conn }
216+
217+
// ConnectionID returns the server-assigned connection ID received in the welcome message.
218+
func (d *DaemonClient) ConnectionID() string { return d.connID }
219+
220+
// Session returns a pointer to the current session state.
221+
func (d *DaemonClient) Session() *SessionState { return &d.sess }
222+
223+
// fanOut delivers a copy of data to every registered subscriber, dropping the
224+
// message if a subscriber's buffer is full.
225+
func (d *DaemonClient) fanOut(data []byte) {
226+
d.subsMu.Lock()
227+
for _, sub := range d.subs {
228+
select {
229+
case sub.ch <- append([]byte(nil), data...):
230+
default: // drop — subscriber is stuck
231+
}
232+
}
233+
d.subsMu.Unlock()
234+
}
235+
236+
// IsBroadcast delegates to DaemonConfig.BroadcastFilter. Returns false if no
237+
// filter is configured.
238+
func (d *DaemonClient) IsBroadcast(data []byte) bool {
239+
if d.cfg.BroadcastFilter == nil {
240+
return false
241+
}
242+
return d.cfg.BroadcastFilter(data)
243+
}

sdk/events.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package sdk
2+
3+
import (
4+
"encoding/json"
5+
"os"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
// EventEntry is one line in the JSONL events file.
12+
type EventEntry struct {
13+
Seq int `json:"seq"`
14+
Ts string `json:"ts"`
15+
Msg json.RawMessage `json:"msg"`
16+
}
17+
18+
// EventWriter appends JSONL event entries to a file in a goroutine-safe manner.
19+
// Each call to Append assigns a monotonically increasing sequence number.
20+
type EventWriter struct {
21+
f *os.File
22+
mu sync.Mutex
23+
seqCounter atomic.Int64
24+
}
25+
26+
// NewEventWriter opens (or creates) the file at path in append mode and returns
27+
// an EventWriter ready to use. The caller must call Close when done.
28+
func NewEventWriter(path string) (*EventWriter, error) {
29+
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return &EventWriter{f: f}, nil
34+
}
35+
36+
// Append writes raw as a JSON event entry (with seq + timestamp) and returns
37+
// the assigned sequence number. Errors writing to disk are silently ignored to
38+
// avoid disrupting the caller's read loop.
39+
func (w *EventWriter) Append(raw []byte) int64 {
40+
seq := w.seqCounter.Add(1)
41+
entry := EventEntry{
42+
Seq: int(seq),
43+
Ts: time.Now().UTC().Format(time.RFC3339),
44+
Msg: json.RawMessage(raw),
45+
}
46+
line, _ := json.Marshal(entry)
47+
w.mu.Lock()
48+
w.f.Write(line) //nolint:errcheck
49+
w.f.Write([]byte("\n")) //nolint:errcheck
50+
w.mu.Unlock()
51+
return seq
52+
}
53+
54+
// Close flushes and closes the underlying file.
55+
func (w *EventWriter) Close() error {
56+
return w.f.Close()
57+
}

0 commit comments

Comments
 (0)