Skip to content

Commit e83be53

Browse files
committed
feat: add WebSocket event buffering + E2E tests
- Fix race condition: buffer events per run so late WS subscribers get replayed events (200 msg cap, 60s TTL auto-cleanup) - Fix POST /api/agents/refresh to actually rescan discovery dirs - Add server.setFactories() for refresh to use correct factories - Add 20 E2E tests covering scenarios 1-9 from Phase 2 exit criteria: config creation, agent discovery from .agent.md, run execution, WebSocket streaming with replay, run lifecycle, refresh rescan, REST API contract validation, error cases - Total: 122 tests (102 unit/integration + 20 E2E)
1 parent 23cf22d commit e83be53

5 files changed

Lines changed: 512 additions & 8 deletions

File tree

src/daemon-entry.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const main = async (): Promise<void> => {
1313

1414
const server = createDaemonServer();
1515
const factories = [createMarkdownFactory(), createCodeFactory()];
16+
server.setFactories(factories);
1617

1718
// Initial agent discovery
1819
const agents = await scanAgents(config.discovery.dirs, factories);

src/daemon/routes.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,18 @@ const VERSION = '0.2.0';
77
const startTime = Date.now();
88

99
let agents: Agent[] = [];
10+
let refreshHandler: (() => Promise<Agent[]>) | null = null;
1011

1112
export const setAgents = (newAgents: Agent[]): void => {
1213
agents = newAgents;
1314
};
1415

1516
export const getAgents = (): Agent[] => agents;
1617

18+
export const setRefreshHandler = (handler: () => Promise<Agent[]>): void => {
19+
refreshHandler = handler;
20+
};
21+
1722
export const createRoutes = (): Router => {
1823
const router = createRouter();
1924
router.use(json());
@@ -33,9 +38,16 @@ export const createRoutes = (): Router => {
3338
res.json(agents.map((a) => a.manifest));
3439
});
3540

36-
router.post('/api/agents/refresh', (_req, res) => {
37-
// Scanner will be called by the server module
38-
res.json(agents.map((a) => a.manifest));
41+
router.post('/api/agents/refresh', async (_req, res) => {
42+
try {
43+
if (refreshHandler) {
44+
agents = await refreshHandler();
45+
}
46+
res.json(agents.map((a) => a.manifest));
47+
} catch (err: unknown) {
48+
const message = err instanceof Error ? err.message : String(err);
49+
res.status(500).json({ error: message });
50+
}
3951
});
4052

4153
router.post('/api/agents/:name/run', async (req, res) => {

src/daemon/server.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,40 @@
11
import { createServer, type Server } from 'node:http';
22
import express from 'express';
3-
import { type Agent } from '@agentage/core';
3+
import { type Agent, type AgentFactory } from '@agentage/core';
44
import { loadConfig } from './config.js';
55
import { logInfo } from './logger.js';
6-
import { createRoutes, setAgents } from './routes.js';
6+
import { createRoutes, setAgents, setRefreshHandler } from './routes.js';
77
import { setupWebSocket } from './websocket.js';
88
import { cancelAllRuns } from './run-manager.js';
9+
import { scanAgents } from '../discovery/scanner.js';
910

1011
export interface DaemonServer {
1112
server: Server;
1213
start: () => Promise<void>;
1314
stop: () => Promise<void>;
1415
updateAgents: (agents: Agent[]) => void;
16+
setFactories: (factories: AgentFactory[]) => void;
1517
}
1618

1719
export const createDaemonServer = (): DaemonServer => {
1820
const app = express();
1921
const server = createServer(app);
22+
let factories: AgentFactory[] = [];
2023

2124
const routes = createRoutes();
2225
app.use(routes);
2326

2427
setupWebSocket(server);
2528

29+
// Wire up refresh to actually rescan
30+
setRefreshHandler(async () => {
31+
const config = loadConfig();
32+
const agents = await scanAgents(config.discovery.dirs, factories);
33+
setAgents(agents);
34+
logInfo(`Refresh: discovered ${agents.length} agent(s)`);
35+
return agents;
36+
});
37+
2638
const start = async (): Promise<void> => {
2739
const config = loadConfig();
2840
const port = config.daemon.port;
@@ -57,5 +69,9 @@ export const createDaemonServer = (): DaemonServer => {
5769
setAgents(agents);
5870
};
5971

60-
return { server, start, stop, updateAgents };
72+
const setFactoriesFn = (f: AgentFactory[]): void => {
73+
factories = f;
74+
};
75+
76+
return { server, start, stop, updateAgents, setFactories: setFactoriesFn };
6177
};

src/daemon/websocket.ts

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,42 @@ interface UnsubscribeMessage {
1616

1717
type ClientMessage = SubscribeMessage | UnsubscribeMessage;
1818

19+
type BufferedMessage =
20+
| { type: 'run_event'; runId: string; event: RunEvent }
21+
| { type: 'run_state'; run: Run };
22+
23+
const MAX_BUFFER_PER_RUN = 200;
24+
const BUFFER_TTL_MS = 60_000;
25+
1926
const clientSubscriptions = new Map<WebSocket, Set<string>>();
27+
const runBuffers = new Map<string, BufferedMessage[]>();
28+
const bufferTimers = new Map<string, ReturnType<typeof setTimeout>>();
29+
30+
const bufferMessage = (runId: string, msg: BufferedMessage): void => {
31+
let buf = runBuffers.get(runId);
32+
if (!buf) {
33+
buf = [];
34+
runBuffers.set(runId, buf);
35+
// Auto-cleanup after TTL
36+
const timer = setTimeout(() => {
37+
runBuffers.delete(runId);
38+
bufferTimers.delete(runId);
39+
}, BUFFER_TTL_MS);
40+
bufferTimers.set(runId, timer);
41+
}
42+
if (buf.length < MAX_BUFFER_PER_RUN) {
43+
buf.push(msg);
44+
}
45+
};
46+
47+
const replayBuffer = (ws: WebSocket, runId: string): void => {
48+
const buf = runBuffers.get(runId);
49+
if (!buf) return;
50+
logDebug(`Replaying ${buf.length} buffered messages for run ${runId}`);
51+
for (const msg of buf) {
52+
sendToClient(ws, msg);
53+
}
54+
};
2055

2156
const sendToClient = (ws: WebSocket, data: unknown): void => {
2257
if (ws.readyState === ws.OPEN) {
@@ -28,17 +63,23 @@ export const setupWebSocket = (server: Server): WebSocketServer => {
2863
const wss = new WebSocketServer({ server, path: '/ws' });
2964

3065
onRunEvent((runId: string, event: RunEvent) => {
66+
const msg: BufferedMessage = { type: 'run_event', runId, event };
67+
bufferMessage(runId, msg);
68+
3169
for (const [ws, subs] of clientSubscriptions) {
3270
if (subs.has(runId)) {
33-
sendToClient(ws, { type: 'run_event', runId, event });
71+
sendToClient(ws, msg);
3472
}
3573
}
3674
});
3775

3876
onRunStateChange((run: Run) => {
77+
const msg: BufferedMessage = { type: 'run_state', run };
78+
bufferMessage(run.id, msg);
79+
3980
for (const [ws, subs] of clientSubscriptions) {
4081
if (subs.has(run.id)) {
41-
sendToClient(ws, { type: 'run_state', run });
82+
sendToClient(ws, msg);
4283
}
4384
}
4485
});
@@ -54,6 +95,8 @@ export const setupWebSocket = (server: Server): WebSocketServer => {
5495
if (msg.type === 'subscribe') {
5596
clientSubscriptions.get(ws)?.add(msg.runId);
5697
logDebug(`Client subscribed to run ${msg.runId}`);
98+
// Replay any buffered events the client missed
99+
replayBuffer(ws, msg.runId);
57100
}
58101

59102
if (msg.type === 'unsubscribe') {

0 commit comments

Comments
 (0)