-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathserver.js
More file actions
200 lines (169 loc) · 10.1 KB
/
server.js
File metadata and controls
200 lines (169 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import http from 'http';
import fs from 'fs';
import path from 'path';
import { fileURLToPath } from 'url';
import { LRUCache } from 'lru-cache';
const PKG_VERSION = JSON.parse(fs.readFileSync(new URL('./package.json', import.meta.url), 'utf8')).version;
import { createExpressApp } from './lib/routes-upload.js';
import { queries } from './database.js';
import { runClaudeWithStreaming } from './lib/claude-runner-run.js';
import { initializeDescriptors, getAgentDescriptor } from './lib/agent-descriptors.js';
import { discoverExternalACPServers, initializeAgentDiscovery } from './lib/agent-discovery.js';
import { startGeminiOAuth, exchangeGeminiOAuthCode, handleGeminiOAuthCallback, getGeminiOAuthStatus, getGeminiOAuthState } from './lib/oauth-gemini.js';
import { initSpeechManager, getSpeech, voiceCacheManager, modelDownloadState, ensureModelsDownloaded, eagerTTS } from './lib/speech-manager.js';
import { createRegistry } from './lib/routes-registry.js';
import { BROADCAST_TYPES } from './lib/broadcast.js';
import { startCodexOAuth, exchangeCodexOAuthCode, handleCodexOAuthCallback, getCodexOAuthStatus, getCodexOAuthState, CODEX_HOME, CODEX_AUTH_FILE } from './lib/oauth-codex.js';
import { WSOptimizer } from './lib/ws-optimizer.js';
import { WsRouter } from './lib/ws-protocol.js';
import { encode as wsEncode } from './lib/codec.js';
import { parseBody, acceptsEncoding, compressAndSend, sendJSON } from './lib/http-utils.js';
import { createWsSetup } from './lib/ws-setup.js';
import { createHttpHandler } from './lib/http-handler.js';
import { createOnServerReady } from './lib/server-startup.js';
import { createAutoImport, createDbRecovery, createPluginLoader } from './lib/server-startup2.js';
const sendWs = (ws, obj) => { if (ws.readyState === 1) ws.send(wsEncode(obj)); };
import { startAll as startACPTools, stopAll as stopACPTools, getStatus as getACPStatus, getPort as getACPPort, ensureRunning, queryModels as queryACPModels, touch as touchACP } from './lib/acp-sdk-manager.js';
import * as execMachine from './lib/execution-machine.js';
import * as toolInstallMachine from './lib/tool-install-machine.js';
import { _assetCache, htmlState, generateETag, warmAssetCache, serveFile as _serveFile } from './lib/asset-server.js';
import { installGMAgentConfigs } from './lib/gm-agent-configs.js';
import * as toolManager from './lib/tool-manager.js';
import { pm2Manager } from './lib/pm2-manager.js';
import CheckpointManager from './lib/checkpoint-manager.js';
import { createBroadcast } from './lib/broadcast.js';
import { createRecovery } from './lib/recovery.js';
import { parseRateLimitResetTime } from './lib/process-message-rate-limit.js';
import { createEventHandler } from './lib/stream-event-handler.js';
import { createMessageQueue } from './lib/message-queue.js';
import { createProcessMessage } from './lib/process-message.js';
import { buildSystemPrompt, getProviderConfigs, saveProviderConfig } from './lib/provider-config.js';
import { logError, errLogPath, makeCleanupExecution, makeGetModelsForAgent } from './lib/server-utils.js';
process.on('uncaughtException', (err, origin) => { console.error('[FATAL] Uncaught exception:', err.message, '| origin:', origin); console.error(err.stack); });
process.on('unhandledRejection', (reason) => { console.error('[FATAL] Unhandled rejection:', reason instanceof Error ? reason.message : reason); if (reason instanceof Error) console.error(reason.stack); });
process.on('SIGHUP', () => { console.log('[SIGNAL] SIGHUP received (ignored - uncrashable)'); });
process.on('beforeExit', (code) => { console.log('[PROCESS] beforeExit with code:', code); });
process.on('exit', (code) => { console.log('[PROCESS] exit with code:', code); });
const activeExecutions = new Map();
const activeScripts = new Map();
const messageQueues = new Map();
const rateLimitState = new Map();
let _jsonlWatcher = null;
const activeProcessesByRunId = new Map();
const checkpointManager = new CheckpointManager(queries);
const STUCK_AGENT_THRESHOLD_MS = 1800000;
const NO_PID_GRACE_PERIOD_MS = 60000;
const debugLog = (msg) => {
const timestamp = new Date().toISOString();
console.error(`[${timestamp}] ${msg}`);
};
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const rootDir = process.env.PORTABLE_EXE_DIR || __dirname;
const PORT = process.env.PORT || 3000;
const BASE_URL = (process.env.BASE_URL || '/gm').replace(/\/+/g, '/').replace(/\/+$/, '');
const watch = process.argv.includes('--no-watch') ? false : (process.argv.includes('--watch') || process.env.HOT_RELOAD !== 'false');
const STARTUP_CWD = (() => {
const cwd = process.env.STARTUP_CWD || process.cwd();
try { fs.accessSync(cwd, fs.constants.R_OK); return cwd; } catch { console.warn(`[server] STARTUP_CWD "${cwd}" not accessible, falling back to ${process.cwd()}`); return process.cwd(); }
})();
const staticDir = path.join(rootDir, 'static');
if (!fs.existsSync(staticDir)) fs.mkdirSync(staticDir, { recursive: true });
const expressApp = createExpressApp({ queries, BASE_URL });
let discoveredAgents = [];
initializeDescriptors(discoveredAgents);
const startTime = Date.now();
initializeAgentDiscovery(discoveredAgents, rootDir, logError).then(() => {
initializeDescriptors(discoveredAgents);
console.log('[INIT] initializeAgentDiscovery completed in', Date.now() - startTime, 'ms');
}).catch(() => {});
const modelCache = new Map();
const getModelsForAgent = makeGetModelsForAgent({ modelCache, discoveredAgents, ensureRunning, queryACPModels });
const _rateLimitMap = new LRUCache({ max: 1000, ttl: 60000 });
const RATE_LIMIT_MAX = parseInt(process.env.RATE_LIMIT_MAX || '300', 10);
const _assetDeps = { compressAndSend, acceptsEncoding, watch, BASE_URL, PKG_VERSION };
function serveFile(filePath, res, req) { return _serveFile(filePath, res, req, _assetDeps); }
const _routes = {};
const server = http.createServer(createHttpHandler({ BASE_URL, expressApp, queries, sendJSON, serveFile, staticDir, messageQueues, getWss: () => wss, activeExecutions, getACPStatus, discoveredAgents, PKG_VERSION, RATE_LIMIT_MAX, rateLimitMap: _rateLimitMap, routes: _routes, handleGeminiOAuthCallback, handleCodexOAuthCallback, PORT }));
let broadcastSeq = 0;
const syncClients = new Set();
const subscriptionIndex = new Map();
const pm2Subscribers = new Set();
const wsOptimizer = new WSOptimizer();
const broadcastSync = createBroadcast({
syncClients,
subscriptionIndex,
wsOptimizer,
broadcastTypes: BROADCAST_TYPES,
getSeq: () => ++broadcastSeq
});
const cleanupExecution = makeCleanupExecution({ execMachine, activeExecutions, queries, broadcastSync, debugLog });
const { scheduleRetry, drainMessageQueue } = createMessageQueue({ queries, messageQueues, activeExecutions, rateLimitState, execMachine, broadcastSync, cleanupExecution, debugLog, getProcessMessageWithStreaming: () => processMessageWithStreaming });
const { processMessageWithStreaming } = createProcessMessage({
queries, activeExecutions, rateLimitState, execMachine,
broadcastSync, runClaudeWithStreaming, cleanupExecution, checkpointManager,
discoveredAgents, STARTUP_CWD, buildSystemPrompt,
parseRateLimitResetTime, eagerTTS, touchACP,
getJsonlWatcher: () => _jsonlWatcher,
debugLog, logError,
scheduleRetry, drainMessageQueue, createEventHandler
});
const wsRouter = new WsRouter();
createRegistry(wsRouter, { queries, sendJSON, parseBody, broadcastSync, debugLog, PORT, BASE_URL, rootDir, STARTUP_CWD, PKG_VERSION, processMessageWithStreaming, activeExecutions, activeProcessesByRunId, activeScripts, messageQueues, rateLimitState, cleanupExecution, discoveredAgents, getACPStatus, modelCache, getModelsForAgent, logError, toolManager, syncClients, wsOptimizer, errLogPath, getJsonlWatcher: () => getJsonlWatcher(), routes: _routes });
const { wss, hotReloadClients } = createWsSetup(server, {
BASE_URL, watch, staticDir, _assetCache, htmlState, sendWs, wsRouter, debugLog,
subscriptionIndex, syncClients, pm2Subscribers, wsOptimizer,
legacyDeps: {
subscriptionIndex, execMachine, activeExecutions, messageQueues,
checkpointManager, queries, pm2Manager, pm2Subscribers,
getSeq: () => ++broadcastSeq, sendWs, debugLog
}
});
const { killActiveExecutions, recoverStaleSessions, resumeInterruptedStreams, isProcessAlive, markAgentDead, resumeConversation, performAgentHealthCheck } = createRecovery({
activeExecutions,
processMessageWithStreaming,
queries,
broadcastSync,
checkpointManager,
drainMessageQueue,
stuckThresholdMs: STUCK_AGENT_THRESHOLD_MS,
noPidGracePeriodMs: NO_PID_GRACE_PERIOD_MS
});
process.on('SIGTERM', () => {
console.log('[SIGNAL] SIGTERM received - graceful shutdown');
killActiveExecutions();
const _jw = getJsonlWatcher(); if (_jw) try { _jw.stop(); } catch (_) {}
try { pm2Manager.disconnect(); } catch (_) {}
stopACPTools().catch(() => {}).finally(() => {
try { wss.close(() => server.close(() => process.exit(0))); } catch (_) { process.exit(0); }
setTimeout(() => process.exit(1), 5000);
});
});
process.on('SIGINT', () => {
killActiveExecutions();
process.exit(0);
});
server.on('error', (err) => {
if (err.code === 'EADDRINUSE') {
console.error(`Port ${PORT} already in use. Waiting 3 seconds before retry...`);
setTimeout(() => {
server.listen(PORT, onServerReady);
}, 3000);
} else {
console.error('[SERVER] Error (contained):', err.message);
}
});
const { performAutoImport } = createAutoImport({ queries, broadcastSync });
const { performDbRecovery } = createDbRecovery({ queries, debugLog });
const { loadPluginExtensions } = createPluginLoader({ pluginsDir: path.join(__dirname, 'lib', 'plugins'), expressApp, BASE_URL });
setInterval(performDbRecovery, 300000);
const { onServerReady, getJsonlWatcher } = createOnServerReady({
queries, broadcastSync, warmAssetCache, staticDir, toolManager, discoveredAgents,
PORT, BASE_URL, watch, setWatcher: (w) => { _jsonlWatcher = w; }, resumeInterruptedStreams, activeExecutions,
debugLog, installGMAgentConfigs, startACPTools, getACPStatus, execMachine,
toolInstallMachine, getSpeech, ensureModelsDownloaded, performAutoImport,
performAgentHealthCheck, pm2Manager, pm2Subscribers, recoverStaleSessions
});
server.listen(PORT, () => {
onServerReady();
loadPluginExtensions();
});