Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/daemon/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { loadConfig } from './config.js';
import { cancelRun, getRun, getRuns, sendInput, startRun } from './run-manager.js';
import { getHubSync } from '../hub/hub-sync.js';
import { readAuth } from '../hub/auth.js';
import { createHubClient } from '../hub/hub-client.js';

const VERSION = '0.2.0';
const startTime = Date.now();
Expand Down Expand Up @@ -122,9 +123,59 @@ export const createRoutes = (): Router => {
res.json({ ok: true });
});

// OAuth callback placeholder
router.get('/auth/callback', (_req, res) => {
res.send('Hub sync not yet available.');
// Hub proxy routes — daemon proxies CLI requests to hub API
router.get('/api/hub/machines', async (_req, res) => {
const auth = readAuth();
if (!auth) {
res.status(401).json({ error: 'Not logged in' });
return;
}
try {
const client = createHubClient(auth.hub.url, auth);
const machines = await client.getMachines();
res.json(machines);
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
res.status(502).json({ error: message });
}
});

router.get('/api/hub/agents', async (req, res) => {
const auth = readAuth();
if (!auth) {
res.status(401).json({ error: 'Not logged in' });
return;
}
try {
const client = createHubClient(auth.hub.url, auth);
const machineId = req.query.machine as string | undefined;
const hubAgents = await client.getAgents(machineId);
res.json(hubAgents);
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
res.status(502).json({ error: message });
}
});

router.post('/api/hub/runs', async (req, res) => {
const auth = readAuth();
if (!auth) {
res.status(401).json({ error: 'Not logged in' });
return;
}
try {
const client = createHubClient(auth.hub.url, auth);
const { machineId, agentName, input } = req.body as {
machineId: string;
agentName: string;
input: string;
};
const result = await client.createRun(machineId, agentName, input);
res.json(result);
} catch (err: unknown) {
const message = err instanceof Error ? err.message : String(err);
res.status(502).json({ error: message });
}
});

return router;
Expand Down
8 changes: 3 additions & 5 deletions src/daemon/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,8 @@ describe('server', () => {
expect(Array.isArray(body)).toBe(true);
});

it('GET /auth/callback returns placeholder', async () => {
const res = await fetch(`http://localhost:${port}/auth/callback`);
expect(res.status).toBe(200);
const text = await res.text();
expect(text).toContain('not yet available');
it('GET /api/hub/machines returns 401 when not logged in', async () => {
const res = await fetch(`http://localhost:${port}/api/hub/machines`);
expect(res.status).toBe(401);
});
});
16 changes: 16 additions & 0 deletions src/hub/hub-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ export interface HubClient {
deregister: (machineId: string) => Promise<void>;
getMachines: () => Promise<unknown[]>;
getAgents: (machineId?: string) => Promise<unknown[]>;
createRun: (machineId: string, agentName: string, input: string) => Promise<unknown>;
cancelRun: (runId: string) => Promise<void>;
sendRunInput: (runId: string, text: string) => Promise<void>;
}

export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => {
Expand Down Expand Up @@ -73,5 +76,18 @@ export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => {
const data = await request('GET', path);
return data as unknown[];
},

createRun: async (machineId, agentName, input) => {
const data = await request('POST', '/runs', { machineId, agentName, input });
return data;
},

cancelRun: async (runId) => {
await request('POST', `/runs/${runId}/cancel`);
},

sendRunInput: async (runId, text) => {
await request('POST', `/runs/${runId}/input`, { text });
},
};
};
45 changes: 36 additions & 9 deletions src/hub/hub-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { platform, arch } from 'node:os';
import { loadConfig } from '../daemon/config.js';
import { type AuthState, readAuth, saveAuth } from './auth.js';
import { createHubClient, type HubClient } from './hub-client.js';
import { createHubWs, type HubWs } from './hub-ws.js';
import { createReconnector, type Reconnector } from './reconnection.js';
import { logInfo, logWarn } from '../daemon/logger.js';
import { getAgents } from '../daemon/routes.js';
import { getRuns } from '../daemon/run-manager.js';
import { cancelRun, sendInput, getRuns } from '../daemon/run-manager.js';

const HEARTBEAT_INTERVAL_MS = 30_000;
const DAEMON_VERSION = '0.2.0';
Expand All @@ -18,11 +19,12 @@ export interface HubSync {

export const createHubSync = (): HubSync => {
let hubClient: HubClient | null = null;
let hubWs: HubWs | null = null;
let heartbeatTimer: ReturnType<typeof setInterval> | null = null;
let reconnector: Reconnector | null = null;
let connected = false;

const register = async (auth: AuthState): Promise<void> => {
const connectAll = async (auth: AuthState): Promise<void> => {
const config = loadConfig();

hubClient = createHubClient(auth.hub.url, auth);
Expand All @@ -39,15 +41,25 @@ export const createHubSync = (): HubSync => {
auth.hub.machineId = result.machineId;
saveAuth(auth);

connected = true;
logInfo(`Registered with hub as machine ${result.machineId}`);

// Connect WebSocket
hubWs = createHubWs(auth.hub.url, auth.session.access_token, auth.hub.machineId, () => {
// On disconnect — trigger reconnection
connected = false;
logWarn('[hub-sync] WS disconnected, will reconnect via heartbeat');
reconnector?.start();
});

hubWs.connect();
connected = true;
};

const startHeartbeat = (auth: AuthState): void => {
if (heartbeatTimer) clearInterval(heartbeatTimer);

heartbeatTimer = setInterval(async () => {
if (!hubClient || !connected) return;
if (!hubClient) return;

try {
const agents = getAgents().map((a) => ({
Expand All @@ -66,10 +78,20 @@ export const createHubSync = (): HubSync => {
activeRunIds,
});

// Process pending commands
// Process pending commands from hub
if (response.pendingCommands && Array.isArray(response.pendingCommands)) {
if (response.pendingCommands.length > 0) {
logInfo(`Received ${response.pendingCommands.length} pending command(s) from hub`);
for (const cmd of response.pendingCommands as Array<{
type: string;
runId: string;
payload?: string;
}>) {
if (cmd.type === 'cancel') {
cancelRun(cmd.runId);
logInfo(`Processed pending cancel for run ${cmd.runId}`);
} else if (cmd.type === 'input' && cmd.payload) {
sendInput(cmd.runId, cmd.payload);
logInfo(`Processed pending input for run ${cmd.runId}`);
}
}
}
} catch (err) {
Expand All @@ -88,7 +110,7 @@ export const createHubSync = (): HubSync => {

reconnector = createReconnector({
onReconnect: async () => {
await register(auth);
await connectAll(auth);
startHeartbeat(auth);
},
onError: (err) => {
Expand All @@ -99,7 +121,7 @@ export const createHubSync = (): HubSync => {
});

try {
await register(auth);
await connectAll(auth);
startHeartbeat(auth);
logInfo(`Connected to hub at ${auth.hub.url}`);
} catch (err) {
Expand All @@ -122,6 +144,11 @@ export const createHubSync = (): HubSync => {
reconnector = null;
}

if (hubWs) {
hubWs.disconnect();
hubWs = null;
}

if (hubClient && connected) {
const auth = readAuth();
if (auth) {
Expand Down
15 changes: 15 additions & 0 deletions src/hub/hub-ws.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { describe, test, expect } from 'vitest';
import { createHubWs } from './hub-ws.js';

describe('HubWs', () => {
test('creates without error', () => {
const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1');
expect(hubWs).toBeDefined();
expect(hubWs.isConnected()).toBe(false);
});

test('disconnect is safe when not connected', () => {
const hubWs = createHubWs('http://localhost:3001', 'test-token', 'machine-1');
expect(() => hubWs.disconnect()).not.toThrow();
});
});
160 changes: 160 additions & 0 deletions src/hub/hub-ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import WebSocket from 'ws';
import { logInfo, logError } from '../daemon/logger.js';
import { getAgents } from '../daemon/routes.js';
import {
startRun,
cancelRun,
sendInput,
onRunEvent,
onRunStateChange,
} from '../daemon/run-manager.js';

interface WsExecuteRequest {
type: 'execute';
requestId: string;
agentName: string;
input: { task: string; config?: Record<string, unknown>; context?: string[] };
}

interface WsCancel {
type: 'cancel';
runId: string;
}

interface WsSendInput {
type: 'input';
runId: string;
text: string;
}

type HubMessage = WsExecuteRequest | WsCancel | WsSendInput;

export interface HubWs {
connect: () => void;
disconnect: () => void;
isConnected: () => boolean;
}

export const createHubWs = (
hubUrl: string,
token: string,
machineId: string,
onDisconnect?: () => void
): HubWs => {
let ws: WebSocket | null = null;
let connected = false;
const eventUnsubscribers: Array<() => void> = [];

const send = (message: unknown): void => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
};

const handleExecute = async (msg: WsExecuteRequest): Promise<void> => {
const agents = getAgents();
const agent = agents.find((a) => a.manifest.name === msg.agentName);

if (!agent) {
send({ type: 'execute_rejected', requestId: msg.requestId, reason: 'Agent not found' });
return;
}

try {
const runId = await startRun(agent, msg.input.task, msg.input.config, msg.input.context);
send({ type: 'execute_accepted', requestId: msg.requestId, runId });

// Subscribe to run events and stream to hub
const unsubEvent = onRunEvent((eventRunId, event) => {
if (eventRunId === runId) {
send({ type: 'run_event', runId, event });
}
});

const unsubState = onRunStateChange((run) => {
if (run.id === runId) {
send({ type: 'run_state', runId, state: run.state, error: run.error, stats: run.stats });
}
});

eventUnsubscribers.push(unsubEvent, unsubState);
} catch (err) {
send({
type: 'execute_rejected',
requestId: msg.requestId,
reason: err instanceof Error ? err.message : String(err),
});
}
};

const handleMessage = (data: WebSocket.Data): void => {
try {
const msg = JSON.parse(data.toString()) as HubMessage;

switch (msg.type) {
case 'execute':
handleExecute(msg).catch((err) => {
logError(`Execute handler error: ${err instanceof Error ? err.message : String(err)}`);
});
break;

case 'cancel':
cancelRun(msg.runId);
logInfo(`Hub requested cancel for run ${msg.runId}`);
break;

case 'input':
sendInput(msg.runId, msg.text);
logInfo(`Hub sent input for run ${msg.runId}`);
break;
}
} catch {
logError('[hub-ws] Failed to parse message from hub');
}
};

return {
connect: () => {
const wsUrl = hubUrl.replace(/^http/, 'ws');
const url = `${wsUrl}/ws?token=${encodeURIComponent(token)}&machineId=${encodeURIComponent(machineId)}`;

ws = new WebSocket(url);

ws.on('open', () => {
connected = true;
logInfo('[hub-ws] Connected to hub');
});

ws.on('message', handleMessage);

ws.on('close', () => {
connected = false;
logInfo('[hub-ws] Disconnected from hub');
cleanup();
onDisconnect?.();
});

ws.on('error', (err) => {
logError(`[hub-ws] Error: ${err.message}`);
});
},

disconnect: () => {
cleanup();
if (ws) {
ws.close();
ws = null;
}
connected = false;
},

isConnected: () => connected,
};

function cleanup(): void {
for (const unsub of eventUnsubscribers) {
unsub();
}
eventUnsubscribers.length = 0;
}
};