From 29b4321d1ebb59f4a801e67672d803de8bb273d0 Mon Sep 17 00:00:00 2001 From: Justin Lee Date: Wed, 10 Jun 2026 11:15:17 +0800 Subject: [PATCH] fix: human-readable upstream errors with auto-retry Replace raw Node.js error codes (ETIMEDOUT, ENOTFOUND, etc.) with classified messages showing host, plain-language label, agent version, and recovery hints. Transient errors (ETIMEDOUT, ENOTFOUND, EHOSTUNREACH, ECONNREFUSED, EAI_AGAIN) auto-retry once after 1s before failing. Deduplicate socket+request error logging via setImmediate flag. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/forward.js | 153 ++++++++++++++++++++++++++----------- test/forward-proxy.test.js | 48 ++++++++++++ 2 files changed, 155 insertions(+), 46 deletions(-) diff --git a/server/forward.js b/server/forward.js index 6e4b6f8..7ae17e9 100644 --- a/server/forward.js +++ b/server/forward.js @@ -368,6 +368,44 @@ async function persistEditedRequest(ctx) { } } +// ── Upstream error classification ─────────────────────────────────── +const RETRYABLE_CODES = new Set(['ETIMEDOUT', 'ENOTFOUND', 'EHOSTUNREACH', 'ECONNREFUSED', 'EAI_AGAIN']); + +function describeUpstreamError(err, host) { + const code = err.code || ''; + const labels = { + ETIMEDOUT: 'connection timed out', + ENOTFOUND: 'DNS lookup failed', + EHOSTUNREACH: 'host unreachable', + ECONNREFUSED: 'connection refused', + EAI_AGAIN: 'DNS temporarily unavailable', + ECONNRESET: 'connection reset by peer', + EPIPE: 'broken pipe', + }; + const hints = { + ETIMEDOUT: 'check your network connection', + ENOTFOUND: 'check your network or DNS settings', + EHOSTUNREACH: 'check your network connection', + ECONNREFUSED: 'is the upstream API available?', + EAI_AGAIN: 'DNS will likely recover on its own', + }; + const label = labels[code] || err.message || code || 'unknown error'; + const codeTag = code ? ` (${code})` : ''; + let agentVer = null; + for (const e of store.versionIndex.values()) { + if (e.version && (!agentVer || e.version > agentVer.v)) { + agentVer = { v: e.version, label: e.agentLabel }; + } + } + const verTag = agentVer ? ` [${agentVer.label} ${agentVer.v}]` : ''; + return { + code, + summary: `${host}: ${label}${codeTag}${verTag}`, + hint: hints[code] || null, + retryable: RETRYABLE_CODES.has(code), + }; +} + // ── Forward request to Anthropic ───────────────────────────────────── function forwardRequest(ctx) { const { id, ts, startTime, parsedBody, rawBody, clientReq, clientRes, fwdHeaders, reqSessionId } = ctx; @@ -439,57 +477,79 @@ function forwardRequest(ctx) { const transport = upstream.protocol === 'http' ? http : https; const tunnelAgent = getTunnelAgent(upstream); - const proxyReq = transport.request({ - hostname: upstream.host, port: upstream.port, - path: config.joinUpstreamPath(upstream, stripAuthParams(clientReq.url)), method: clientReq.method, - headers: { ...fwdHeaders, 'content-length': bodyToSend.length }, - ...(tunnelAgent ? { agent: tunnelAgent } : {}), - }, (proxyRes) => { - const isSSE = (proxyRes.headers['content-type'] || '').includes('text/event-stream'); - - // Capture rate limit headers once, share with state + sample log. - const parsedRL = collectRatelimitHeaders(proxyRes.headers); - if (parsedRL && parsedRL.tokensLimit != null) { - store.setRateLimitState({ ...parsedRL, updatedAt: Date.now() }); - } - if (parsedRL) { - appendSample({ - parsed: parsedRL, - model: parsedBody?.model || null, - planHint: process.env.CCXRAY_PLAN || null, - }); - } - clientRes.writeHead(proxyRes.statusCode, proxyRes.headers); - if (isSSE) { - handleSSEResponse(ctx, proxyRes, clientRes); - } else { - handleNonSSEResponse(ctx, proxyRes, clientRes); - } - }); + function sendUpstream(attempt) { + if (clientRes.destroyed) return; + const proxyReq = transport.request({ + hostname: upstream.host, port: upstream.port, + path: config.joinUpstreamPath(upstream, stripAuthParams(clientReq.url)), method: clientReq.method, + headers: { ...fwdHeaders, 'content-length': bodyToSend.length }, + ...(tunnelAgent ? { agent: tunnelAgent } : {}), + }, (proxyRes) => { + const isSSE = (proxyRes.headers['content-type'] || '').includes('text/event-stream'); + + // Capture rate limit headers once, share with state + sample log. + const parsedRL = collectRatelimitHeaders(proxyRes.headers); + if (parsedRL && parsedRL.tokensLimit != null) { + store.setRateLimitState({ ...parsedRL, updatedAt: Date.now() }); + } + if (parsedRL) { + appendSample({ + parsed: parsedRL, + model: parsedBody?.model || null, + planHint: process.env.CCXRAY_PLAN || null, + }); + } + clientRes.writeHead(proxyRes.statusCode, proxyRes.headers); - proxyReq.on('error', (err) => { - console.error(`\x1b[31m❌ PROXY ERROR: ${err.message || err.code || String(err)}\x1b[0m`); - if (reqSessionId) { - store.activeRequests[reqSessionId] = Math.max(0, (store.activeRequests[reqSessionId] || 1) - 1); - broadcastSessionStatus(reqSessionId); - } - if (!clientRes.headersSent) { - clientRes.writeHead(502, { 'Content-Type': 'application/json' }); - } - clientRes.end(JSON.stringify({ error: 'proxy_error', message: err.message })); - }); + if (isSSE) { + handleSSEResponse(ctx, proxyRes, clientRes); + } else { + handleNonSSEResponse(ctx, proxyRes, clientRes); + } + }); + + let reqErrorHandled = false; + proxyReq.on('error', (err) => { + reqErrorHandled = true; + const info = describeUpstreamError(err, upstream.host); + + if (info.retryable && attempt === 0 && !clientRes.headersSent && !clientRes.destroyed) { + console.error(`\x1b[33m⏳ ${info.summary} — retrying…\x1b[0m`); + setTimeout(() => sendUpstream(1), 1000); + return; + } - // Late socket errors (EPIPE / ECONNRESET after the response has been received) - // are emitted on the underlying TLS/TCP socket and may not re-emit on the - // ClientRequest. Without a listener they crash the entire proxy process. - proxyReq.on('socket', (socket) => { - socket.on('error', (err) => { - console.error(`\x1b[31m❌ UPSTREAM SOCKET ERROR: ${err.code || err.message}\x1b[0m`); + const suffix = attempt > 0 ? ' — retry failed' : ''; + console.error(`\x1b[31m❌ ${info.summary}${suffix}\x1b[0m`); + if (info.hint) console.error(`\x1b[31m → ${info.hint}\x1b[0m`); + if (reqSessionId) { + store.activeRequests[reqSessionId] = Math.max(0, (store.activeRequests[reqSessionId] || 1) - 1); + broadcastSessionStatus(reqSessionId); + } + if (!clientRes.headersSent) { + clientRes.writeHead(502, { 'Content-Type': 'application/json' }); + } + clientRes.end(JSON.stringify({ error: 'proxy_error', message: err.message })); }); - }); - proxyReq.end(bodyToSend); + // Late socket errors (EPIPE / ECONNRESET after response received) may not + // re-emit on the ClientRequest. Listener prevents uncaught-exception crash. + // Deferred check avoids duplicate logging when proxyReq 'error' already fired. + proxyReq.on('socket', (socket) => { + socket.on('error', (err) => { + setImmediate(() => { + if (!reqErrorHandled) { + console.error(`\x1b[31m❌ ${upstream.host}: socket error — ${err.code || err.message}\x1b[0m`); + } + }); + }); + }); + + proxyReq.end(bodyToSend); + } + + sendUpstream(0); } function handleSSEResponse(ctx, proxyRes, clientRes) { @@ -931,6 +991,7 @@ module.exports = { applyModelPrefix, stripInjectedStats, buildEditSummary, + describeUpstreamError, setStatusLineEnabled, getStatusLineEnabled, parseSSEFrame, diff --git a/test/forward-proxy.test.js b/test/forward-proxy.test.js index 5d31c7d..6932ac9 100644 --- a/test/forward-proxy.test.js +++ b/test/forward-proxy.test.js @@ -6,6 +6,7 @@ const { resolveProxyAgent, applyModelPrefix, stripInjectedStats, + describeUpstreamError, setStatusLineEnabled, getStatusLineEnabled, parseSSEFrame, @@ -117,6 +118,53 @@ describe('stripInjectedStats', () => { }); }); +describe('describeUpstreamError', () => { + it('classifies ETIMEDOUT as retryable with hint', () => { + const info = describeUpstreamError({ code: 'ETIMEDOUT' }, 'api.anthropic.com'); + assert.equal(info.retryable, true); + assert.match(info.summary, /api\.anthropic\.com.*timed out.*ETIMEDOUT/); + assert.ok(info.hint); + }); + + it('classifies ENOTFOUND as retryable with hint', () => { + const info = describeUpstreamError({ code: 'ENOTFOUND' }, 'api.anthropic.com'); + assert.equal(info.retryable, true); + assert.match(info.summary, /DNS lookup failed/); + assert.ok(info.hint); + }); + + it('classifies EHOSTUNREACH as retryable', () => { + const info = describeUpstreamError({ code: 'EHOSTUNREACH' }, 'example.com'); + assert.equal(info.retryable, true); + assert.match(info.summary, /unreachable/); + }); + + it('classifies ECONNRESET as non-retryable', () => { + const info = describeUpstreamError({ code: 'ECONNRESET' }, 'example.com'); + assert.equal(info.retryable, false); + assert.match(info.summary, /reset/); + assert.equal(info.hint, null); + }); + + it('classifies EPIPE as non-retryable', () => { + const info = describeUpstreamError({ code: 'EPIPE' }, 'example.com'); + assert.equal(info.retryable, false); + }); + + it('falls back to err.message for unknown codes', () => { + const info = describeUpstreamError({ code: 'EWHATEVER', message: 'something broke' }, 'host'); + assert.equal(info.retryable, false); + assert.match(info.summary, /something broke/); + }); + + it('handles missing code gracefully', () => { + const info = describeUpstreamError({ message: 'oops' }, 'host'); + assert.equal(info.retryable, false); + assert.match(info.summary, /oops/); + assert.equal(info.code, ''); + }); +}); + describe('statusLineEnabled flag', () => { beforeEach(() => setStatusLineEnabled(true));