diff --git a/src/daemon/routes.ts b/src/daemon/routes.ts index c83747e..89be8e9 100644 --- a/src/daemon/routes.ts +++ b/src/daemon/routes.ts @@ -4,6 +4,7 @@ import { loadConfig } from './config.js'; import { cancelRun, getRun, getRuns, sendInput, startRun } from './run-manager.js'; import { getHubSync } from '../hub/hub-sync.js'; import { readAuth } from '../hub/auth.js'; +import { createHubClient } from '../hub/hub-client.js'; const VERSION = '0.2.0'; const startTime = Date.now(); @@ -122,9 +123,59 @@ export const createRoutes = (): Router => { res.json({ ok: true }); }); - // OAuth callback placeholder - router.get('/auth/callback', (_req, res) => { - res.send('Hub sync not yet available.'); + // Hub proxy routes — daemon proxies CLI requests to hub API + router.get('/api/hub/machines', async (_req, res) => { + const auth = readAuth(); + if (!auth) { + res.status(401).json({ error: 'Not logged in' }); + return; + } + try { + const client = createHubClient(auth.hub.url, auth); + const machines = await client.getMachines(); + res.json(machines); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + res.status(502).json({ error: message }); + } + }); + + router.get('/api/hub/agents', async (req, res) => { + const auth = readAuth(); + if (!auth) { + res.status(401).json({ error: 'Not logged in' }); + return; + } + try { + const client = createHubClient(auth.hub.url, auth); + const machineId = req.query.machine as string | undefined; + const hubAgents = await client.getAgents(machineId); + res.json(hubAgents); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + res.status(502).json({ error: message }); + } + }); + + router.post('/api/hub/runs', async (req, res) => { + const auth = readAuth(); + if (!auth) { + res.status(401).json({ error: 'Not logged in' }); + return; + } + try { + const client = createHubClient(auth.hub.url, auth); + const { machineId, agentName, input } = req.body as { + machineId: string; + agentName: string; + input: string; + }; + const result = await client.createRun(machineId, agentName, input); + res.json(result); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + res.status(502).json({ error: message }); + } }); return router; diff --git a/src/daemon/server.test.ts b/src/daemon/server.test.ts index 0ffab93..2f33854 100644 --- a/src/daemon/server.test.ts +++ b/src/daemon/server.test.ts @@ -174,10 +174,8 @@ describe('server', () => { expect(Array.isArray(body)).toBe(true); }); - it('GET /auth/callback returns placeholder', async () => { - const res = await fetch(`http://localhost:${port}/auth/callback`); - expect(res.status).toBe(200); - const text = await res.text(); - expect(text).toContain('not yet available'); + it('GET /api/hub/machines returns 401 when not logged in', async () => { + const res = await fetch(`http://localhost:${port}/api/hub/machines`); + expect(res.status).toBe(401); }); }); diff --git a/src/hub/hub-client.ts b/src/hub/hub-client.ts index 7ee09dd..74ca928 100644 --- a/src/hub/hub-client.ts +++ b/src/hub/hub-client.ts @@ -18,6 +18,9 @@ export interface HubClient { deregister: (machineId: string) => Promise; getMachines: () => Promise; getAgents: (machineId?: string) => Promise; + createRun: (machineId: string, agentName: string, input: string) => Promise; + cancelRun: (runId: string) => Promise; + sendRunInput: (runId: string, text: string) => Promise; } export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => { @@ -73,5 +76,18 @@ export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => { const data = await request('GET', path); return data as unknown[]; }, + + createRun: async (machineId, agentName, input) => { + const data = await request('POST', '/runs', { machineId, agentName, input }); + return data; + }, + + cancelRun: async (runId) => { + await request('POST', `/runs/${runId}/cancel`); + }, + + sendRunInput: async (runId, text) => { + await request('POST', `/runs/${runId}/input`, { text }); + }, }; }; diff --git a/src/hub/hub-sync.ts b/src/hub/hub-sync.ts index 7b0f9d3..11e4f6c 100644 --- a/src/hub/hub-sync.ts +++ b/src/hub/hub-sync.ts @@ -2,10 +2,11 @@ import { platform, arch } from 'node:os'; import { loadConfig } from '../daemon/config.js'; import { type AuthState, readAuth, saveAuth } from './auth.js'; import { createHubClient, type HubClient } from './hub-client.js'; +import { createHubWs, type HubWs } from './hub-ws.js'; import { createReconnector, type Reconnector } from './reconnection.js'; import { logInfo, logWarn } from '../daemon/logger.js'; import { getAgents } from '../daemon/routes.js'; -import { getRuns } from '../daemon/run-manager.js'; +import { cancelRun, sendInput, getRuns } from '../daemon/run-manager.js'; const HEARTBEAT_INTERVAL_MS = 30_000; const DAEMON_VERSION = '0.2.0'; @@ -18,11 +19,12 @@ export interface HubSync { export const createHubSync = (): HubSync => { let hubClient: HubClient | null = null; + let hubWs: HubWs | null = null; let heartbeatTimer: ReturnType | null = null; let reconnector: Reconnector | null = null; let connected = false; - const register = async (auth: AuthState): Promise => { + const connectAll = async (auth: AuthState): Promise => { const config = loadConfig(); hubClient = createHubClient(auth.hub.url, auth); @@ -39,15 +41,25 @@ export const createHubSync = (): HubSync => { auth.hub.machineId = result.machineId; saveAuth(auth); - connected = true; logInfo(`Registered with hub as machine ${result.machineId}`); + + // Connect WebSocket + hubWs = createHubWs(auth.hub.url, auth.session.access_token, auth.hub.machineId, () => { + // On disconnect — trigger reconnection + connected = false; + logWarn('[hub-sync] WS disconnected, will reconnect via heartbeat'); + reconnector?.start(); + }); + + hubWs.connect(); + connected = true; }; const startHeartbeat = (auth: AuthState): void => { if (heartbeatTimer) clearInterval(heartbeatTimer); heartbeatTimer = setInterval(async () => { - if (!hubClient || !connected) return; + if (!hubClient) return; try { const agents = getAgents().map((a) => ({ @@ -66,10 +78,20 @@ export const createHubSync = (): HubSync => { activeRunIds, }); - // Process pending commands + // Process pending commands from hub if (response.pendingCommands && Array.isArray(response.pendingCommands)) { - if (response.pendingCommands.length > 0) { - logInfo(`Received ${response.pendingCommands.length} pending command(s) from hub`); + for (const cmd of response.pendingCommands as Array<{ + type: string; + runId: string; + payload?: string; + }>) { + if (cmd.type === 'cancel') { + cancelRun(cmd.runId); + logInfo(`Processed pending cancel for run ${cmd.runId}`); + } else if (cmd.type === 'input' && cmd.payload) { + sendInput(cmd.runId, cmd.payload); + logInfo(`Processed pending input for run ${cmd.runId}`); + } } } } catch (err) { @@ -88,7 +110,7 @@ export const createHubSync = (): HubSync => { reconnector = createReconnector({ onReconnect: async () => { - await register(auth); + await connectAll(auth); startHeartbeat(auth); }, onError: (err) => { @@ -99,7 +121,7 @@ export const createHubSync = (): HubSync => { }); try { - await register(auth); + await connectAll(auth); startHeartbeat(auth); logInfo(`Connected to hub at ${auth.hub.url}`); } catch (err) { @@ -122,6 +144,11 @@ export const createHubSync = (): HubSync => { reconnector = null; } + if (hubWs) { + hubWs.disconnect(); + hubWs = null; + } + if (hubClient && connected) { const auth = readAuth(); if (auth) { diff --git a/src/hub/hub-ws.test.ts b/src/hub/hub-ws.test.ts new file mode 100644 index 0000000..ba5b52d --- /dev/null +++ b/src/hub/hub-ws.test.ts @@ -0,0 +1,15 @@ +import { describe, test, expect } from 'vitest'; +import { createHubWs } from './hub-ws.js'; + +describe('HubWs', () => { + test('creates without error', () => { + const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1'); + expect(hubWs).toBeDefined(); + expect(hubWs.isConnected()).toBe(false); + }); + + test('disconnect is safe when not connected', () => { + const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1'); + expect(() => hubWs.disconnect()).not.toThrow(); + }); +}); diff --git a/src/hub/hub-ws.ts b/src/hub/hub-ws.ts new file mode 100644 index 0000000..4b28a73 --- /dev/null +++ b/src/hub/hub-ws.ts @@ -0,0 +1,160 @@ +import WebSocket from 'ws'; +import { logInfo, logError } from '../daemon/logger.js'; +import { getAgents } from '../daemon/routes.js'; +import { + startRun, + cancelRun, + sendInput, + onRunEvent, + onRunStateChange, +} from '../daemon/run-manager.js'; + +interface WsExecuteRequest { + type: 'execute'; + requestId: string; + agentName: string; + input: { task: string; config?: Record; context?: string[] }; +} + +interface WsCancel { + type: 'cancel'; + runId: string; +} + +interface WsSendInput { + type: 'input'; + runId: string; + text: string; +} + +type HubMessage = WsExecuteRequest | WsCancel | WsSendInput; + +export interface HubWs { + connect: () => void; + disconnect: () => void; + isConnected: () => boolean; +} + +export const createHubWs = ( + hubUrl: string, + token: string, + machineId: string, + onDisconnect?: () => void +): HubWs => { + let ws: WebSocket | null = null; + let connected = false; + const eventUnsubscribers: Array<() => void> = []; + + const send = (message: unknown): void => { + if (ws?.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(message)); + } + }; + + const handleExecute = async (msg: WsExecuteRequest): Promise => { + const agents = getAgents(); + const agent = agents.find((a) => a.manifest.name === msg.agentName); + + if (!agent) { + send({ type: 'execute_rejected', requestId: msg.requestId, reason: 'Agent not found' }); + return; + } + + try { + const runId = await startRun(agent, msg.input.task, msg.input.config, msg.input.context); + send({ type: 'execute_accepted', requestId: msg.requestId, runId }); + + // Subscribe to run events and stream to hub + const unsubEvent = onRunEvent((eventRunId, event) => { + if (eventRunId === runId) { + send({ type: 'run_event', runId, event }); + } + }); + + const unsubState = onRunStateChange((run) => { + if (run.id === runId) { + send({ type: 'run_state', runId, state: run.state, error: run.error, stats: run.stats }); + } + }); + + eventUnsubscribers.push(unsubEvent, unsubState); + } catch (err) { + send({ + type: 'execute_rejected', + requestId: msg.requestId, + reason: err instanceof Error ? err.message : String(err), + }); + } + }; + + const handleMessage = (data: WebSocket.Data): void => { + try { + const msg = JSON.parse(data.toString()) as HubMessage; + + switch (msg.type) { + case 'execute': + handleExecute(msg).catch((err) => { + logError(`Execute handler error: ${err instanceof Error ? err.message : String(err)}`); + }); + break; + + case 'cancel': + cancelRun(msg.runId); + logInfo(`Hub requested cancel for run ${msg.runId}`); + break; + + case 'input': + sendInput(msg.runId, msg.text); + logInfo(`Hub sent input for run ${msg.runId}`); + break; + } + } catch { + logError('[hub-ws] Failed to parse message from hub'); + } + }; + + return { + connect: () => { + const wsUrl = hubUrl.replace(/^http/, 'ws'); + const url = `${wsUrl}/ws?token=${encodeURIComponent(token)}&machineId=${encodeURIComponent(machineId)}`; + + ws = new WebSocket(url); + + ws.on('open', () => { + connected = true; + logInfo('[hub-ws] Connected to hub'); + }); + + ws.on('message', handleMessage); + + ws.on('close', () => { + connected = false; + logInfo('[hub-ws] Disconnected from hub'); + cleanup(); + onDisconnect?.(); + }); + + ws.on('error', (err) => { + logError(`[hub-ws] Error: ${err.message}`); + }); + }, + + disconnect: () => { + cleanup(); + if (ws) { + ws.close(); + ws = null; + } + connected = false; + }, + + isConnected: () => connected, + }; + + function cleanup(): void { + for (const unsub of eventUnsubscribers) { + unsub(); + } + eventUnsubscribers.length = 0; + } +};