Skip to content

Commit 80fc409

Browse files
authored
Merge pull request #51 from agentage/feature/phase-4-ws-execution
feat: hub WebSocket client + execution handling + proxy routes
2 parents a06a7de + 752633a commit 80fc409

6 files changed

Lines changed: 284 additions & 17 deletions

File tree

src/daemon/routes.ts

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { loadConfig } from './config.js';
44
import { cancelRun, getRun, getRuns, sendInput, startRun } from './run-manager.js';
55
import { getHubSync } from '../hub/hub-sync.js';
66
import { readAuth } from '../hub/auth.js';
7+
import { createHubClient } from '../hub/hub-client.js';
78

89
const VERSION = '0.2.0';
910
const startTime = Date.now();
@@ -122,9 +123,59 @@ export const createRoutes = (): Router => {
122123
res.json({ ok: true });
123124
});
124125

125-
// OAuth callback placeholder
126-
router.get('/auth/callback', (_req, res) => {
127-
res.send('Hub sync not yet available.');
126+
// Hub proxy routes — daemon proxies CLI requests to hub API
127+
router.get('/api/hub/machines', async (_req, res) => {
128+
const auth = readAuth();
129+
if (!auth) {
130+
res.status(401).json({ error: 'Not logged in' });
131+
return;
132+
}
133+
try {
134+
const client = createHubClient(auth.hub.url, auth);
135+
const machines = await client.getMachines();
136+
res.json(machines);
137+
} catch (err: unknown) {
138+
const message = err instanceof Error ? err.message : String(err);
139+
res.status(502).json({ error: message });
140+
}
141+
});
142+
143+
router.get('/api/hub/agents', async (req, res) => {
144+
const auth = readAuth();
145+
if (!auth) {
146+
res.status(401).json({ error: 'Not logged in' });
147+
return;
148+
}
149+
try {
150+
const client = createHubClient(auth.hub.url, auth);
151+
const machineId = req.query.machine as string | undefined;
152+
const hubAgents = await client.getAgents(machineId);
153+
res.json(hubAgents);
154+
} catch (err: unknown) {
155+
const message = err instanceof Error ? err.message : String(err);
156+
res.status(502).json({ error: message });
157+
}
158+
});
159+
160+
router.post('/api/hub/runs', async (req, res) => {
161+
const auth = readAuth();
162+
if (!auth) {
163+
res.status(401).json({ error: 'Not logged in' });
164+
return;
165+
}
166+
try {
167+
const client = createHubClient(auth.hub.url, auth);
168+
const { machineId, agentName, input } = req.body as {
169+
machineId: string;
170+
agentName: string;
171+
input: string;
172+
};
173+
const result = await client.createRun(machineId, agentName, input);
174+
res.json(result);
175+
} catch (err: unknown) {
176+
const message = err instanceof Error ? err.message : String(err);
177+
res.status(502).json({ error: message });
178+
}
128179
});
129180

130181
return router;

src/daemon/server.test.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -174,10 +174,8 @@ describe('server', () => {
174174
expect(Array.isArray(body)).toBe(true);
175175
});
176176

177-
it('GET /auth/callback returns placeholder', async () => {
178-
const res = await fetch(`http://localhost:${port}/auth/callback`);
179-
expect(res.status).toBe(200);
180-
const text = await res.text();
181-
expect(text).toContain('not yet available');
177+
it('GET /api/hub/machines returns 401 when not logged in', async () => {
178+
const res = await fetch(`http://localhost:${port}/api/hub/machines`);
179+
expect(res.status).toBe(401);
182180
});
183181
});

src/hub/hub-client.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ export interface HubClient {
1818
deregister: (machineId: string) => Promise<void>;
1919
getMachines: () => Promise<unknown[]>;
2020
getAgents: (machineId?: string) => Promise<unknown[]>;
21+
createRun: (machineId: string, agentName: string, input: string) => Promise<unknown>;
22+
cancelRun: (runId: string) => Promise<void>;
23+
sendRunInput: (runId: string, text: string) => Promise<void>;
2124
}
2225

2326
export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => {
@@ -73,5 +76,18 @@ export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => {
7376
const data = await request('GET', path);
7477
return data as unknown[];
7578
},
79+
80+
createRun: async (machineId, agentName, input) => {
81+
const data = await request('POST', '/runs', { machineId, agentName, input });
82+
return data;
83+
},
84+
85+
cancelRun: async (runId) => {
86+
await request('POST', `/runs/${runId}/cancel`);
87+
},
88+
89+
sendRunInput: async (runId, text) => {
90+
await request('POST', `/runs/${runId}/input`, { text });
91+
},
7692
};
7793
};

src/hub/hub-sync.ts

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ import { platform, arch } from 'node:os';
22
import { loadConfig } from '../daemon/config.js';
33
import { type AuthState, readAuth, saveAuth } from './auth.js';
44
import { createHubClient, type HubClient } from './hub-client.js';
5+
import { createHubWs, type HubWs } from './hub-ws.js';
56
import { createReconnector, type Reconnector } from './reconnection.js';
67
import { logInfo, logWarn } from '../daemon/logger.js';
78
import { getAgents } from '../daemon/routes.js';
8-
import { getRuns } from '../daemon/run-manager.js';
9+
import { cancelRun, sendInput, getRuns } from '../daemon/run-manager.js';
910

1011
const HEARTBEAT_INTERVAL_MS = 30_000;
1112
const DAEMON_VERSION = '0.2.0';
@@ -18,11 +19,12 @@ export interface HubSync {
1819

1920
export const createHubSync = (): HubSync => {
2021
let hubClient: HubClient | null = null;
22+
let hubWs: HubWs | null = null;
2123
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
2224
let reconnector: Reconnector | null = null;
2325
let connected = false;
2426

25-
const register = async (auth: AuthState): Promise<void> => {
27+
const connectAll = async (auth: AuthState): Promise<void> => {
2628
const config = loadConfig();
2729

2830
hubClient = createHubClient(auth.hub.url, auth);
@@ -39,15 +41,25 @@ export const createHubSync = (): HubSync => {
3941
auth.hub.machineId = result.machineId;
4042
saveAuth(auth);
4143

42-
connected = true;
4344
logInfo(`Registered with hub as machine ${result.machineId}`);
45+
46+
// Connect WebSocket
47+
hubWs = createHubWs(auth.hub.url, auth.session.access_token, auth.hub.machineId, () => {
48+
// On disconnect — trigger reconnection
49+
connected = false;
50+
logWarn('[hub-sync] WS disconnected, will reconnect via heartbeat');
51+
reconnector?.start();
52+
});
53+
54+
hubWs.connect();
55+
connected = true;
4456
};
4557

4658
const startHeartbeat = (auth: AuthState): void => {
4759
if (heartbeatTimer) clearInterval(heartbeatTimer);
4860

4961
heartbeatTimer = setInterval(async () => {
50-
if (!hubClient || !connected) return;
62+
if (!hubClient) return;
5163

5264
try {
5365
const agents = getAgents().map((a) => ({
@@ -66,10 +78,20 @@ export const createHubSync = (): HubSync => {
6678
activeRunIds,
6779
});
6880

69-
// Process pending commands
81+
// Process pending commands from hub
7082
if (response.pendingCommands && Array.isArray(response.pendingCommands)) {
71-
if (response.pendingCommands.length > 0) {
72-
logInfo(`Received ${response.pendingCommands.length} pending command(s) from hub`);
83+
for (const cmd of response.pendingCommands as Array<{
84+
type: string;
85+
runId: string;
86+
payload?: string;
87+
}>) {
88+
if (cmd.type === 'cancel') {
89+
cancelRun(cmd.runId);
90+
logInfo(`Processed pending cancel for run ${cmd.runId}`);
91+
} else if (cmd.type === 'input' && cmd.payload) {
92+
sendInput(cmd.runId, cmd.payload);
93+
logInfo(`Processed pending input for run ${cmd.runId}`);
94+
}
7395
}
7496
}
7597
} catch (err) {
@@ -88,7 +110,7 @@ export const createHubSync = (): HubSync => {
88110

89111
reconnector = createReconnector({
90112
onReconnect: async () => {
91-
await register(auth);
113+
await connectAll(auth);
92114
startHeartbeat(auth);
93115
},
94116
onError: (err) => {
@@ -99,7 +121,7 @@ export const createHubSync = (): HubSync => {
99121
});
100122

101123
try {
102-
await register(auth);
124+
await connectAll(auth);
103125
startHeartbeat(auth);
104126
logInfo(`Connected to hub at ${auth.hub.url}`);
105127
} catch (err) {
@@ -122,6 +144,11 @@ export const createHubSync = (): HubSync => {
122144
reconnector = null;
123145
}
124146

147+
if (hubWs) {
148+
hubWs.disconnect();
149+
hubWs = null;
150+
}
151+
125152
if (hubClient && connected) {
126153
const auth = readAuth();
127154
if (auth) {

src/hub/hub-ws.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { describe, test, expect } from 'vitest';
2+
import { createHubWs } from './hub-ws.js';
3+
4+
describe('HubWs', () => {
5+
test('creates without error', () => {
6+
const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1');
7+
expect(hubWs).toBeDefined();
8+
expect(hubWs.isConnected()).toBe(false);
9+
});
10+
11+
test('disconnect is safe when not connected', () => {
12+
const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1');
13+
expect(() => hubWs.disconnect()).not.toThrow();
14+
});
15+
});

src/hub/hub-ws.ts

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import WebSocket from 'ws';
2+
import { logInfo, logError } from '../daemon/logger.js';
3+
import { getAgents } from '../daemon/routes.js';
4+
import {
5+
startRun,
6+
cancelRun,
7+
sendInput,
8+
onRunEvent,
9+
onRunStateChange,
10+
} from '../daemon/run-manager.js';
11+
12+
interface WsExecuteRequest {
13+
type: 'execute';
14+
requestId: string;
15+
agentName: string;
16+
input: { task: string; config?: Record<string, unknown>; context?: string[] };
17+
}
18+
19+
interface WsCancel {
20+
type: 'cancel';
21+
runId: string;
22+
}
23+
24+
interface WsSendInput {
25+
type: 'input';
26+
runId: string;
27+
text: string;
28+
}
29+
30+
type HubMessage = WsExecuteRequest | WsCancel | WsSendInput;
31+
32+
export interface HubWs {
33+
connect: () => void;
34+
disconnect: () => void;
35+
isConnected: () => boolean;
36+
}
37+
38+
export const createHubWs = (
39+
hubUrl: string,
40+
token: string,
41+
machineId: string,
42+
onDisconnect?: () => void
43+
): HubWs => {
44+
let ws: WebSocket | null = null;
45+
let connected = false;
46+
const eventUnsubscribers: Array<() => void> = [];
47+
48+
const send = (message: unknown): void => {
49+
if (ws?.readyState === WebSocket.OPEN) {
50+
ws.send(JSON.stringify(message));
51+
}
52+
};
53+
54+
const handleExecute = async (msg: WsExecuteRequest): Promise<void> => {
55+
const agents = getAgents();
56+
const agent = agents.find((a) => a.manifest.name === msg.agentName);
57+
58+
if (!agent) {
59+
send({ type: 'execute_rejected', requestId: msg.requestId, reason: 'Agent not found' });
60+
return;
61+
}
62+
63+
try {
64+
const runId = await startRun(agent, msg.input.task, msg.input.config, msg.input.context);
65+
send({ type: 'execute_accepted', requestId: msg.requestId, runId });
66+
67+
// Subscribe to run events and stream to hub
68+
const unsubEvent = onRunEvent((eventRunId, event) => {
69+
if (eventRunId === runId) {
70+
send({ type: 'run_event', runId, event });
71+
}
72+
});
73+
74+
const unsubState = onRunStateChange((run) => {
75+
if (run.id === runId) {
76+
send({ type: 'run_state', runId, state: run.state, error: run.error, stats: run.stats });
77+
}
78+
});
79+
80+
eventUnsubscribers.push(unsubEvent, unsubState);
81+
} catch (err) {
82+
send({
83+
type: 'execute_rejected',
84+
requestId: msg.requestId,
85+
reason: err instanceof Error ? err.message : String(err),
86+
});
87+
}
88+
};
89+
90+
const handleMessage = (data: WebSocket.Data): void => {
91+
try {
92+
const msg = JSON.parse(data.toString()) as HubMessage;
93+
94+
switch (msg.type) {
95+
case 'execute':
96+
handleExecute(msg).catch((err) => {
97+
logError(`Execute handler error: ${err instanceof Error ? err.message : String(err)}`);
98+
});
99+
break;
100+
101+
case 'cancel':
102+
cancelRun(msg.runId);
103+
logInfo(`Hub requested cancel for run ${msg.runId}`);
104+
break;
105+
106+
case 'input':
107+
sendInput(msg.runId, msg.text);
108+
logInfo(`Hub sent input for run ${msg.runId}`);
109+
break;
110+
}
111+
} catch {
112+
logError('[hub-ws] Failed to parse message from hub');
113+
}
114+
};
115+
116+
return {
117+
connect: () => {
118+
const wsUrl = hubUrl.replace(/^http/, 'ws');
119+
const url = `${wsUrl}/ws?token=${encodeURIComponent(token)}&machineId=${encodeURIComponent(machineId)}`;
120+
121+
ws = new WebSocket(url);
122+
123+
ws.on('open', () => {
124+
connected = true;
125+
logInfo('[hub-ws] Connected to hub');
126+
});
127+
128+
ws.on('message', handleMessage);
129+
130+
ws.on('close', () => {
131+
connected = false;
132+
logInfo('[hub-ws] Disconnected from hub');
133+
cleanup();
134+
onDisconnect?.();
135+
});
136+
137+
ws.on('error', (err) => {
138+
logError(`[hub-ws] Error: ${err.message}`);
139+
});
140+
},
141+
142+
disconnect: () => {
143+
cleanup();
144+
if (ws) {
145+
ws.close();
146+
ws = null;
147+
}
148+
connected = false;
149+
},
150+
151+
isConnected: () => connected,
152+
};
153+
154+
function cleanup(): void {
155+
for (const unsub of eventUnsubscribers) {
156+
unsub();
157+
}
158+
eventUnsubscribers.length = 0;
159+
}
160+
};

0 commit comments

Comments
 (0)