diff --git a/package-lock.json b/package-lock.json index b8494bc..27c9259 100644 --- a/package-lock.json +++ b/package-lock.json @@ -11,30 +11,31 @@ "dependencies": { "@agentage/core": "^0.2.0", "@agentage/platform": "^0.2.0", - "chalk": "*", - "commander": "*", - "express": "*", - "gray-matter": "*", - "jiti": "*", - "open": "*", - "ws": "*" + "@supabase/supabase-js": "2.99.3", + "chalk": "latest", + "commander": "latest", + "express": "latest", + "gray-matter": "latest", + "jiti": "latest", + "open": "latest", + "ws": "latest" }, "bin": { "agentage": "dist/cli.js" }, "devDependencies": { - "@types/express": "*", - "@types/node": "*", - "@types/ws": "*", - "@typescript-eslint/eslint-plugin": "*", - "@typescript-eslint/parser": "*", - "@vitest/coverage-v8": "*", + "@types/express": "latest", + "@types/node": "latest", + "@types/ws": "latest", + "@typescript-eslint/eslint-plugin": "latest", + "@typescript-eslint/parser": "latest", + "@vitest/coverage-v8": "latest", "eslint": "latest", - "eslint-config-prettier": "*", - "eslint-plugin-prettier": "*", - "prettier": "*", - "typescript": "*", - "vitest": "*" + "eslint-config-prettier": "latest", + "eslint-plugin-prettier": "latest", + "prettier": "latest", + "typescript": "latest", + "vitest": "latest" }, "engines": { "node": ">=22.0.0", @@ -641,6 +642,86 @@ "dev": true, "license": "MIT" }, + "node_modules/@supabase/auth-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/auth-js/-/auth-js-2.99.3.tgz", + "integrity": "sha512-vMEVLA1kGGYd/kdsJSwtjiFUZM1nGfrz2DWmgMBZtocV48qL+L2+4QpIkueXyBEumMQZFEyhz57i/5zGHjvdBw==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/functions-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/functions-js/-/functions-js-2.99.3.tgz", + "integrity": "sha512-6tk2zrcBkzKaaBXPOG5nshn30uJNFGOH9LxOnE8i850eQmsX+jVm7vql9kTPyvUzEHwU4zdjSOkXS9M+9ukMVA==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/postgrest-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/postgrest-js/-/postgrest-js-2.99.3.tgz", + "integrity": "sha512-8HxEf+zNycj7Z8+ONhhlu+7J7Ha+L6weyCtdEeK2mN5OWJbh6n4LPU4iuJ5UlCvvNnbSXMoutY7piITEEAgl2g==", + "license": "MIT", + "dependencies": { + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/realtime-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/realtime-js/-/realtime-js-2.99.3.tgz", + "integrity": "sha512-c1azgZ2nZPczbY5k5u5iFrk1InpxN81IvNE+UBAkjrBz3yc5ALLJNkeTQwbJZT4PZBuYXEzqYGLMuh9fdTtTMg==", + "license": "MIT", + "dependencies": { + "@types/phoenix": "^1.6.6", + "@types/ws": "^8.18.1", + "tslib": "2.8.1", + "ws": "^8.18.2" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/storage-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/storage-js/-/storage-js-2.99.3.tgz", + "integrity": "sha512-lOfIm4hInNcd8x0i1LWphnLKxec42wwbjs+vhaVAvR801Vda0UAMbTooUY6gfqgQb8v29GofqKuQMMTAsl6w/w==", + "license": "MIT", + "dependencies": { + "iceberg-js": "^0.8.1", + "tslib": "2.8.1" + }, + "engines": { + "node": ">=20.0.0" + } + }, + "node_modules/@supabase/supabase-js": { + "version": "2.99.3", + "resolved": "https://registry.npmjs.org/@supabase/supabase-js/-/supabase-js-2.99.3.tgz", + "integrity": "sha512-GuPbzoEaI51AkLw9VGhLNvnzw4PHbS3p8j2/JlvLeZNQMKwZw4aEYQIDBRtFwL5Nv7/275n9m4DHtakY8nCvgg==", + "license": "MIT", + "dependencies": { + "@supabase/auth-js": "2.99.3", + "@supabase/functions-js": "2.99.3", + "@supabase/postgrest-js": "2.99.3", + "@supabase/realtime-js": "2.99.3", + "@supabase/storage-js": "2.99.3" + }, + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/@tybys/wasm-util": { "version": "0.10.1", "resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz", @@ -748,12 +829,17 @@ "version": "25.5.0", "resolved": "https://registry.npmjs.org/@types/node/-/node-25.5.0.tgz", "integrity": "sha512-jp2P3tQMSxWugkCUKLRPVUpGaL5MVFwF8RDuSRztfwgN1wmqJeMSbKlnEtQqU8UrhTmzEmZdu2I6v2dpp7XIxw==", - "dev": true, "license": "MIT", "dependencies": { "undici-types": "~7.18.0" } }, + "node_modules/@types/phoenix": { + "version": "1.6.7", + "resolved": "https://registry.npmjs.org/@types/phoenix/-/phoenix-1.6.7.tgz", + "integrity": "sha512-oN9ive//QSBkf19rfDv45M7eZPi0eEXylht2OLEXicu5b4KoQ1OzXIw+xDSGWxSxe1JmepRR/ZH283vsu518/Q==", + "license": "MIT" + }, "node_modules/@types/qs": { "version": "6.15.0", "resolved": "https://registry.npmjs.org/@types/qs/-/qs-6.15.0.tgz", @@ -793,7 +879,6 @@ "version": "8.18.1", "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", - "dev": true, "license": "MIT", "dependencies": { "@types/node": "*" @@ -2290,6 +2375,15 @@ "url": "https://opencollective.com/express" } }, + "node_modules/iceberg-js": { + "version": "0.8.1", + "resolved": "https://registry.npmjs.org/iceberg-js/-/iceberg-js-0.8.1.tgz", + "integrity": "sha512-1dhVQZXhcHje7798IVM+xoo/1ZdVfzOMIc8/rgVSijRK38EDqOJoGula9N/8ZI5RD8QTxNQtK/Gozpr+qUqRRA==", + "license": "MIT", + "engines": { + "node": ">=20.0.0" + } + }, "node_modules/iconv-lite": { "version": "0.7.2", "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.7.2.tgz", @@ -3711,9 +3805,7 @@ "version": "2.8.1", "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", - "dev": true, - "license": "0BSD", - "optional": true + "license": "0BSD" }, "node_modules/type-check": { "version": "0.4.0", @@ -3760,7 +3852,6 @@ "version": "7.18.2", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz", "integrity": "sha512-AsuCzffGHJybSaRrmr5eHr81mwJU3kjw6M+uprWvCXiNeN9SOGwQ3Jn8jb8m3Z6izVgknn1R0FTCEAP2QrLY/w==", - "dev": true, "license": "MIT" }, "node_modules/unpipe": { diff --git a/package.json b/package.json index 5a78f29..0f924e7 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,9 @@ "bin": { "agentage": "./dist/cli.js" }, - "files": ["dist"], + "files": [ + "dist" + ], "engines": { "node": ">=22.0.0", "npm": ">=10.0.0" @@ -29,6 +31,7 @@ "dependencies": { "@agentage/core": "^0.2.0", "@agentage/platform": "^0.2.0", + "@supabase/supabase-js": "2.99.3", "chalk": "latest", "commander": "latest", "express": "latest", @@ -43,13 +46,13 @@ "@types/ws": "latest", "@typescript-eslint/eslint-plugin": "latest", "@typescript-eslint/parser": "latest", + "@vitest/coverage-v8": "latest", "eslint": "latest", "eslint-config-prettier": "latest", "eslint-plugin-prettier": "latest", "prettier": "latest", "typescript": "latest", - "vitest": "latest", - "@vitest/coverage-v8": "latest" + "vitest": "latest" }, "repository": { "type": "git", diff --git a/src/commands/login.ts b/src/commands/login.ts index 23e1fb6..67f336f 100644 --- a/src/commands/login.ts +++ b/src/commands/login.ts @@ -1,12 +1,124 @@ import { type Command } from 'commander'; import chalk from 'chalk'; +import open from 'open'; +import { ensureDaemon } from '../utils/ensure-daemon.js'; +import { saveAuth, type AuthState } from '../hub/auth.js'; +import { startCallbackServer, getCallbackPort } from '../hub/auth-callback.js'; +import { loadConfig, saveConfig } from '../daemon/config.js'; + +const DEFAULT_HUB_URL = 'https://agentage.io'; export const registerLogin = (program: Command): void => { program .command('login') .description('Authenticate with hub') - .option('--hub ', 'Hub URL') - .action(() => { - console.log(chalk.yellow('Hub sync not yet available.')); + .option('--hub ', 'Hub URL', DEFAULT_HUB_URL) + .option('--token ', 'Use access token directly (headless/CI)') + .action(async (opts: { hub: string; token?: string }) => { + await ensureDaemon(); + + const hubUrl = opts.hub; + + if (opts.token) { + // Direct token mode — for headless/CI + console.log(chalk.yellow('Direct token login — skipping browser flow')); + console.log( + chalk.yellow( + 'Note: refresh tokens are not available in direct mode. Session will expire.' + ) + ); + + const config = loadConfig(); + const authState: AuthState = { + session: { + access_token: opts.token, + refresh_token: '', + expires_at: 0, + }, + user: { id: '', email: '' }, + hub: { url: hubUrl, machineId: config.machine.id }, + }; + + saveAuth(authState); + + // Save hub URL to config + config.hub = { url: hubUrl }; + saveConfig(config); + + console.log(chalk.green('Logged in with token.')); + return; + } + + // Fetch supabase config from hub health endpoint + let supabaseUrl: string; + let supabaseAnonKey: string; + + try { + const healthRes = await fetch(`${hubUrl}/api/health`); + const health = (await healthRes.json()) as { + success: boolean; + data: { supabaseUrl: string; supabaseAnonKey: string }; + }; + supabaseUrl = health.data.supabaseUrl; + supabaseAnonKey = health.data.supabaseAnonKey; + } catch { + console.error(chalk.red(`Cannot reach hub at ${hubUrl}. Check the URL and try again.`)); + process.exitCode = 1; + return; + } + + // Start callback server, then open browser + console.log('Opening browser for authentication...'); + + const authPromise = startCallbackServer(supabaseUrl, supabaseAnonKey); + + // Wait a tick for the server to start, then get the port + await new Promise((r) => setTimeout(r, 100)); + const port = getCallbackPort(); + + if (!port) { + console.error(chalk.red('Failed to start callback server')); + process.exitCode = 1; + return; + } + + const redirectUrl = `http://localhost:${port}/auth/callback`; + const authUrl = `${supabaseUrl}/auth/v1/authorize?provider=github&redirect_to=${encodeURIComponent(redirectUrl)}`; + + try { + await open(authUrl); + } catch { + // Browser didn't open — print URL manually + console.log(chalk.yellow('Could not open browser. Open this URL manually:')); + console.log(authUrl); + } + + console.log('Waiting for login...'); + + try { + const authState = await authPromise; + + // Set hub info + authState.hub.url = hubUrl; + const config = loadConfig(); + authState.hub.machineId = config.machine.id; + + saveAuth(authState); + + // Save hub URL to config + config.hub = { url: hubUrl }; + saveConfig(config); + + console.log(chalk.green(`✓ Logged in as ${authState.user.email}`)); + console.log( + `Machine "${config.machine.name}" will register with hub on next daemon restart.` + ); + console.log(chalk.dim('Run `agentage daemon restart` to connect now.')); + } catch (err) { + console.error( + chalk.red(`Login failed: ${err instanceof Error ? err.message : String(err)}`) + ); + process.exitCode = 1; + } }); }; diff --git a/src/commands/status.ts b/src/commands/status.ts index 9439d4d..96c9d63 100644 --- a/src/commands/status.ts +++ b/src/commands/status.ts @@ -10,6 +10,8 @@ interface HealthResponse { uptime: number; machineId: string; hubConnected: boolean; + hubUrl: string | null; + userEmail: string | null; } export const registerStatus = (program: Command): void => { @@ -28,9 +30,17 @@ export const registerStatus = (program: Command): void => { console.log(`Daemon: ${chalk.green('running')} (PID ${pid}, port 4243)`); console.log(`Uptime: ${uptime}`); - console.log( - `Hub: ${health.hubConnected ? chalk.green('connected') : chalk.yellow('not connected (standalone mode)')}` - ); + + if (health.hubConnected) { + console.log(`Hub: ${chalk.green('connected')} (${health.hubUrl})`); + console.log(`User: ${health.userEmail}`); + } else if (health.hubUrl) { + console.log(`Hub: ${chalk.yellow('disconnected')} (${health.hubUrl})`); + console.log(`User: ${health.userEmail}`); + } else { + console.log(`Hub: ${chalk.yellow('not connected (standalone mode)')}`); + } + console.log(`Machine: ${health.machineId}`); console.log(`Agents: ${agents.length} discovered`); console.log(`Runs: ${activeRuns} active`); diff --git a/src/daemon-entry.ts b/src/daemon-entry.ts index 2104920..e019dc6 100644 --- a/src/daemon-entry.ts +++ b/src/daemon-entry.ts @@ -5,6 +5,7 @@ import { createDaemonServer } from './daemon/server.js'; import { scanAgents } from './discovery/scanner.js'; import { createMarkdownFactory } from './discovery/markdown-factory.js'; import { createCodeFactory } from './discovery/code-factory.js'; +import { getHubSync } from './hub/hub-sync.js'; const main = async (): Promise => { const config = loadConfig(); @@ -23,8 +24,13 @@ const main = async (): Promise => { await server.start(); logInfo(`Daemon ready on port ${config.daemon.port}`); + // Initialize hub sync (registers + heartbeat if auth.json exists) + const hubSync = getHubSync(); + await hubSync.start(); + const shutdown = async (): Promise => { logInfo('Daemon shutting down...'); + await hubSync.stop(); await server.stop(); removePidFile(); process.exit(0); diff --git a/src/daemon/routes.ts b/src/daemon/routes.ts index be3f146..c83747e 100644 --- a/src/daemon/routes.ts +++ b/src/daemon/routes.ts @@ -2,6 +2,8 @@ import { type Router, Router as createRouter, json } from 'express'; import { type Agent } from '@agentage/core'; import { loadConfig } from './config.js'; import { cancelRun, getRun, getRuns, sendInput, startRun } from './run-manager.js'; +import { getHubSync } from '../hub/hub-sync.js'; +import { readAuth } from '../hub/auth.js'; const VERSION = '0.2.0'; const startTime = Date.now(); @@ -25,12 +27,17 @@ export const createRoutes = (): Router => { router.get('/api/health', (_req, res) => { const config = loadConfig(); + const hubSync = getHubSync(); + const auth = readAuth(); + res.json({ status: 'ok', version: VERSION, uptime: Math.floor((Date.now() - startTime) / 1000), machineId: config.machine.id, - hubConnected: false, + hubConnected: hubSync.isConnected(), + hubUrl: auth?.hub.url ?? null, + userEmail: auth?.user.email ?? null, }); }); diff --git a/src/hub/auth-callback.ts b/src/hub/auth-callback.ts new file mode 100644 index 0000000..bb0dd69 --- /dev/null +++ b/src/hub/auth-callback.ts @@ -0,0 +1,90 @@ +import express from 'express'; +import { createClient } from '@supabase/supabase-js'; +import { type AuthState } from './auth.js'; + +export const startCallbackServer = ( + supabaseUrl: string, + supabaseAnonKey: string +): Promise => + new Promise((resolve, reject) => { + const app = express(); + + const server = app.listen(0, () => { + const addr = server.address(); + const port = typeof addr === 'object' && addr ? addr.port : 0; + + // Timeout after 120 seconds + const timeout = setTimeout(() => { + server.close(); + reject(new Error('Login timed out — no callback received within 120 seconds')); + }, 120_000); + + app.get('/auth/callback', async (req, res) => { + clearTimeout(timeout); + + const code = req.query.code as string | undefined; + + if (!code) { + res.status(400).send('Missing authorization code'); + server.close(); + reject(new Error('Missing authorization code in callback')); + return; + } + + try { + const supabase = createClient(supabaseUrl, supabaseAnonKey); + const { data, error } = await supabase.auth.exchangeCodeForSession(code); + + if (error || !data.session) { + res.status(500).send('Failed to exchange code for session'); + server.close(); + reject(new Error(`Auth exchange failed: ${error?.message ?? 'no session'}`)); + return; + } + + res.send( + '

Login successful!

You can close this window.

' + ); + server.close(); + + resolve({ + session: { + access_token: data.session.access_token, + refresh_token: data.session.refresh_token, + expires_at: data.session.expires_at ?? 0, + }, + user: { + id: data.user.id, + email: data.user.email ?? '', + name: + (data.user.user_metadata?.full_name as string) ?? + (data.user.user_metadata?.name as string) ?? + '', + avatar: (data.user.user_metadata?.avatar_url as string) ?? '', + }, + hub: { + url: '', + machineId: '', + }, + }); + } catch (err) { + res.status(500).send('Auth error'); + server.close(); + reject(err); + } + }); + + // Export port for the caller to build the OAuth URL + (server as { callbackPort?: number }).callbackPort = port; + }); + + // Expose the server for the caller to read the port + (startCallbackServer as { _server?: typeof server })._server = server; + }); + +export const getCallbackPort = (): number => { + const server = (startCallbackServer as { _server?: { address: () => unknown } })._server; + if (!server) return 0; + const addr = server.address(); + return typeof addr === 'object' && addr ? (addr as { port: number }).port : 0; +}; diff --git a/src/hub/auth.test.ts b/src/hub/auth.test.ts new file mode 100644 index 0000000..53bb7a0 --- /dev/null +++ b/src/hub/auth.test.ts @@ -0,0 +1,55 @@ +import { describe, test, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { join } from 'node:path'; +import { tmpdir } from 'node:os'; +import { readAuth, saveAuth, deleteAuth, type AuthState } from './auth.js'; + +describe('Auth', () => { + let tempDir: string; + const originalEnv = process.env['AGENTAGE_CONFIG_DIR']; + + beforeEach(() => { + tempDir = mkdtempSync(join(tmpdir(), 'agentage-auth-test-')); + process.env['AGENTAGE_CONFIG_DIR'] = tempDir; + }); + + afterEach(() => { + if (originalEnv) { + process.env['AGENTAGE_CONFIG_DIR'] = originalEnv; + } else { + delete process.env['AGENTAGE_CONFIG_DIR']; + } + rmSync(tempDir, { recursive: true, force: true }); + }); + + const testAuth: AuthState = { + session: { + access_token: 'test-token', + refresh_token: 'test-refresh', + expires_at: Date.now() + 3600000, + }, + user: { id: 'user-1', email: 'test@test.com' }, + hub: { url: 'https://hub.test', machineId: 'machine-1' }, + }; + + test('readAuth returns null when no file', () => { + expect(readAuth()).toBeNull(); + }); + + test('saveAuth writes and readAuth reads back', () => { + saveAuth(testAuth); + const result = readAuth(); + expect(result).toEqual(testAuth); + }); + + test('deleteAuth removes file', () => { + saveAuth(testAuth); + expect(readAuth()).not.toBeNull(); + deleteAuth(); + expect(readAuth()).toBeNull(); + }); + + test('deleteAuth is safe when no file', () => { + expect(() => deleteAuth()).not.toThrow(); + }); +}); diff --git a/src/hub/auth.ts b/src/hub/auth.ts new file mode 100644 index 0000000..4651681 --- /dev/null +++ b/src/hub/auth.ts @@ -0,0 +1,46 @@ +import { existsSync, readFileSync, writeFileSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import { getConfigDir } from '../daemon/config.js'; + +export interface AuthState { + session: { + access_token: string; + refresh_token: string; + expires_at: number; + }; + user: { + id: string; + email: string; + name?: string; + avatar?: string; + }; + hub: { + url: string; + machineId: string; + }; +} + +const getAuthPath = (): string => join(getConfigDir(), 'auth.json'); + +export const readAuth = (): AuthState | null => { + const path = getAuthPath(); + if (!existsSync(path)) return null; + + try { + const raw = readFileSync(path, 'utf-8'); + return JSON.parse(raw) as AuthState; + } catch { + return null; + } +}; + +export const saveAuth = (state: AuthState): void => { + writeFileSync(getAuthPath(), JSON.stringify(state, null, 2) + '\n', 'utf-8'); +}; + +export const deleteAuth = (): void => { + const path = getAuthPath(); + if (existsSync(path)) { + unlinkSync(path); + } +}; diff --git a/src/hub/hub-client.ts b/src/hub/hub-client.ts new file mode 100644 index 0000000..7ee09dd --- /dev/null +++ b/src/hub/hub-client.ts @@ -0,0 +1,77 @@ +import { type AuthState } from './auth.js'; + +export interface HubClient { + register: (machineData: { + id: string; + name: string; + platform: string; + arch: string; + daemonVersion: string; + }) => Promise<{ machineId: string }>; + heartbeat: ( + machineId: string, + data: { + agents: Array<{ name: string; description?: string; version?: string; tags?: string[] }>; + activeRunIds: string[]; + } + ) => Promise<{ pendingCommands: unknown[] }>; + deregister: (machineId: string) => Promise; + getMachines: () => Promise; + getAgents: (machineId?: string) => Promise; +} + +export const createHubClient = (hubUrl: string, auth: AuthState): HubClient => { + const headers = (): Record => ({ + 'Content-Type': 'application/json', + Authorization: `Bearer ${auth.session.access_token}`, + }); + + const apiUrl = `${hubUrl}/api`; + + const request = async (method: string, path: string, body?: unknown): Promise => { + const res = await fetch(`${apiUrl}${path}`, { + method, + headers: headers(), + body: body ? JSON.stringify(body) : undefined, + }); + + const json = (await res.json()) as { success: boolean; data?: unknown; error?: unknown }; + + if (!res.ok || !json.success) { + const errMsg = + typeof json.error === 'object' && json.error !== null + ? ((json.error as { message?: string }).message ?? 'Unknown error') + : String(json.error ?? 'Request failed'); + throw new Error(`Hub API error (${res.status}): ${errMsg}`); + } + + return json.data; + }; + + return { + register: async (machineData) => { + const data = await request('POST', '/machines', machineData); + return data as { machineId: string }; + }, + + heartbeat: async (machineId, body) => { + const data = await request('POST', `/machines/${machineId}/heartbeat`, body); + return data as { pendingCommands: unknown[] }; + }, + + deregister: async (machineId) => { + await request('DELETE', `/machines/${machineId}`); + }, + + getMachines: async () => { + const data = await request('GET', '/machines'); + return data as unknown[]; + }, + + getAgents: async (machineId) => { + const path = machineId ? `/agents?machine=${machineId}` : '/agents'; + const data = await request('GET', path); + return data as unknown[]; + }, + }; +}; diff --git a/src/hub/hub-sync.ts b/src/hub/hub-sync.ts new file mode 100644 index 0000000..7b0f9d3 --- /dev/null +++ b/src/hub/hub-sync.ts @@ -0,0 +1,157 @@ +import { platform, arch } from 'node:os'; +import { loadConfig } from '../daemon/config.js'; +import { type AuthState, readAuth, saveAuth } from './auth.js'; +import { createHubClient, type HubClient } from './hub-client.js'; +import { createReconnector, type Reconnector } from './reconnection.js'; +import { logInfo, logWarn } from '../daemon/logger.js'; +import { getAgents } from '../daemon/routes.js'; +import { getRuns } from '../daemon/run-manager.js'; + +const HEARTBEAT_INTERVAL_MS = 30_000; +const DAEMON_VERSION = '0.2.0'; + +export interface HubSync { + start: () => Promise; + stop: () => Promise; + isConnected: () => boolean; +} + +export const createHubSync = (): HubSync => { + let hubClient: HubClient | null = null; + let heartbeatTimer: ReturnType | null = null; + let reconnector: Reconnector | null = null; + let connected = false; + + const register = async (auth: AuthState): Promise => { + const config = loadConfig(); + + hubClient = createHubClient(auth.hub.url, auth); + + const result = await hubClient.register({ + id: config.machine.id, + name: config.machine.name, + platform: platform(), + arch: arch(), + daemonVersion: DAEMON_VERSION, + }); + + // Save machineId in auth + auth.hub.machineId = result.machineId; + saveAuth(auth); + + connected = true; + logInfo(`Registered with hub as machine ${result.machineId}`); + }; + + const startHeartbeat = (auth: AuthState): void => { + if (heartbeatTimer) clearInterval(heartbeatTimer); + + heartbeatTimer = setInterval(async () => { + if (!hubClient || !connected) return; + + try { + const agents = getAgents().map((a) => ({ + name: a.manifest.name, + description: a.manifest.description, + version: a.manifest.version, + tags: a.manifest.tags, + })); + + const activeRunIds = getRuns() + .filter((r) => r.state === 'working' || r.state === 'submitted') + .map((r) => r.id); + + const response = await hubClient.heartbeat(auth.hub.machineId, { + agents, + activeRunIds, + }); + + // Process pending commands + if (response.pendingCommands && Array.isArray(response.pendingCommands)) { + if (response.pendingCommands.length > 0) { + logInfo(`Received ${response.pendingCommands.length} pending command(s) from hub`); + } + } + } catch (err) { + logWarn(`Heartbeat failed: ${err instanceof Error ? err.message : String(err)}`); + } + }, HEARTBEAT_INTERVAL_MS); + }; + + return { + start: async () => { + const auth = readAuth(); + if (!auth) { + logInfo('No auth — running in standalone mode'); + return; + } + + reconnector = createReconnector({ + onReconnect: async () => { + await register(auth); + startHeartbeat(auth); + }, + onError: (err) => { + logWarn( + `Hub connection failed: ${err instanceof Error ? err.message : String(err)}. Retrying...` + ); + }, + }); + + try { + await register(auth); + startHeartbeat(auth); + logInfo(`Connected to hub at ${auth.hub.url}`); + } catch (err) { + logWarn( + `Initial hub connection failed: ${err instanceof Error ? err.message : String(err)}. Will retry.` + ); + connected = false; + reconnector.start(); + } + }, + + stop: async () => { + if (heartbeatTimer) { + clearInterval(heartbeatTimer); + heartbeatTimer = null; + } + + if (reconnector) { + reconnector.stop(); + reconnector = null; + } + + if (hubClient && connected) { + const auth = readAuth(); + if (auth) { + try { + await hubClient.deregister(auth.hub.machineId); + logInfo('Deregistered from hub'); + } catch { + // Best effort + } + } + } + + connected = false; + hubClient = null; + }, + + isConnected: () => connected, + }; +}; + +// Module-level singleton — lazy initialized in daemon-entry +let _hubSync: HubSync | null = null; + +export const getHubSync = (): HubSync => { + if (!_hubSync) { + _hubSync = createHubSync(); + } + return _hubSync; +}; + +export const resetHubSync = (): void => { + _hubSync = null; +}; diff --git a/src/hub/reconnection.test.ts b/src/hub/reconnection.test.ts new file mode 100644 index 0000000..24f2721 --- /dev/null +++ b/src/hub/reconnection.test.ts @@ -0,0 +1,36 @@ +import { describe, test, expect, vi, afterEach } from 'vitest'; +import { createReconnector } from './reconnection.js'; + +describe('Reconnection', () => { + afterEach(() => { + vi.useRealTimers(); + }); + + test('calls onReconnect on start', async () => { + const onReconnect = vi.fn().mockResolvedValue(undefined); + const reconnector = createReconnector({ onReconnect }); + + reconnector.start(); + await vi.waitFor(() => expect(onReconnect).toHaveBeenCalledTimes(1)); + reconnector.stop(); + }); + + test('resets delay after successful connect', async () => { + const onReconnect = vi.fn().mockResolvedValue(undefined); + const reconnector = createReconnector({ onReconnect, initialDelayMs: 100 }); + + reconnector.start(); + await vi.waitFor(() => expect(onReconnect).toHaveBeenCalled()); + reconnector.reset(); + reconnector.stop(); + }); + + test('stop prevents further attempts', () => { + const onReconnect = vi.fn().mockRejectedValue(new Error('fail')); + const reconnector = createReconnector({ onReconnect }); + + reconnector.stop(); + // Should not throw or call onReconnect after stop + expect(true).toBe(true); + }); +}); diff --git a/src/hub/reconnection.ts b/src/hub/reconnection.ts new file mode 100644 index 0000000..2521837 --- /dev/null +++ b/src/hub/reconnection.ts @@ -0,0 +1,55 @@ +export interface Reconnector { + start: () => void; + stop: () => void; + reset: () => void; +} + +export interface ReconnectorOptions { + initialDelayMs?: number; + maxDelayMs?: number; + onReconnect: () => Promise; + onError?: (err: unknown) => void; +} + +export const createReconnector = (opts: ReconnectorOptions): Reconnector => { + const initialDelay = opts.initialDelayMs ?? 1000; + const maxDelay = opts.maxDelayMs ?? 30_000; + + let currentDelay = initialDelay; + let timer: ReturnType | null = null; + let stopped = false; + + const attempt = async (): Promise => { + if (stopped) return; + + try { + await opts.onReconnect(); + currentDelay = initialDelay; + } catch (err) { + opts.onError?.(err); + timer = setTimeout(() => { + attempt(); + }, currentDelay); + currentDelay = Math.min(currentDelay * 2, maxDelay); + } + }; + + return { + start: () => { + stopped = false; + attempt(); + }, + + stop: () => { + stopped = true; + if (timer) { + clearTimeout(timer); + timer = null; + } + }, + + reset: () => { + currentDelay = initialDelay; + }, + }; +};