Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 107 additions & 46 deletions server/forward.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,44 @@ async function persistEditedRequest(ctx) {
}
}

// ── Upstream error classification ───────────────────────────────────
const RETRYABLE_CODES = new Set(['ETIMEDOUT', 'ENOTFOUND', 'EHOSTUNREACH', 'ECONNREFUSED', 'EAI_AGAIN']);

function describeUpstreamError(err, host) {
const code = err.code || '';
const labels = {
ETIMEDOUT: 'connection timed out',
ENOTFOUND: 'DNS lookup failed',
EHOSTUNREACH: 'host unreachable',
ECONNREFUSED: 'connection refused',
EAI_AGAIN: 'DNS temporarily unavailable',
ECONNRESET: 'connection reset by peer',
EPIPE: 'broken pipe',
};
const hints = {
ETIMEDOUT: 'check your network connection',
ENOTFOUND: 'check your network or DNS settings',
EHOSTUNREACH: 'check your network connection',
ECONNREFUSED: 'is the upstream API available?',
EAI_AGAIN: 'DNS will likely recover on its own',
};
const label = labels[code] || err.message || code || 'unknown error';
const codeTag = code ? ` (${code})` : '';
let agentVer = null;
for (const e of store.versionIndex.values()) {
if (e.version && (!agentVer || e.version > agentVer.v)) {
agentVer = { v: e.version, label: e.agentLabel };
}
}
const verTag = agentVer ? ` [${agentVer.label} ${agentVer.v}]` : '';
return {
code,
summary: `${host}: ${label}${codeTag}${verTag}`,
hint: hints[code] || null,
retryable: RETRYABLE_CODES.has(code),
};
}

// ── Forward request to Anthropic ─────────────────────────────────────
function forwardRequest(ctx) {
const { id, ts, startTime, parsedBody, rawBody, clientReq, clientRes, fwdHeaders, reqSessionId } = ctx;
Expand Down Expand Up @@ -439,57 +477,79 @@ function forwardRequest(ctx) {

const transport = upstream.protocol === 'http' ? http : https;
const tunnelAgent = getTunnelAgent(upstream);
const proxyReq = transport.request({
hostname: upstream.host, port: upstream.port,
path: config.joinUpstreamPath(upstream, stripAuthParams(clientReq.url)), method: clientReq.method,
headers: { ...fwdHeaders, 'content-length': bodyToSend.length },
...(tunnelAgent ? { agent: tunnelAgent } : {}),
}, (proxyRes) => {
const isSSE = (proxyRes.headers['content-type'] || '').includes('text/event-stream');

// Capture rate limit headers once, share with state + sample log.
const parsedRL = collectRatelimitHeaders(proxyRes.headers);
if (parsedRL && parsedRL.tokensLimit != null) {
store.setRateLimitState({ ...parsedRL, updatedAt: Date.now() });
}
if (parsedRL) {
appendSample({
parsed: parsedRL,
model: parsedBody?.model || null,
planHint: process.env.CCXRAY_PLAN || null,
});
}
clientRes.writeHead(proxyRes.statusCode, proxyRes.headers);

if (isSSE) {
handleSSEResponse(ctx, proxyRes, clientRes);
} else {
handleNonSSEResponse(ctx, proxyRes, clientRes);
}
});
function sendUpstream(attempt) {
if (clientRes.destroyed) return;
const proxyReq = transport.request({
hostname: upstream.host, port: upstream.port,
path: config.joinUpstreamPath(upstream, stripAuthParams(clientReq.url)), method: clientReq.method,
headers: { ...fwdHeaders, 'content-length': bodyToSend.length },
...(tunnelAgent ? { agent: tunnelAgent } : {}),
}, (proxyRes) => {
const isSSE = (proxyRes.headers['content-type'] || '').includes('text/event-stream');

// Capture rate limit headers once, share with state + sample log.
const parsedRL = collectRatelimitHeaders(proxyRes.headers);
if (parsedRL && parsedRL.tokensLimit != null) {
store.setRateLimitState({ ...parsedRL, updatedAt: Date.now() });
}
if (parsedRL) {
appendSample({
parsed: parsedRL,
model: parsedBody?.model || null,
planHint: process.env.CCXRAY_PLAN || null,
});
}
clientRes.writeHead(proxyRes.statusCode, proxyRes.headers);

proxyReq.on('error', (err) => {
console.error(`\x1b[31m❌ PROXY ERROR: ${err.message || err.code || String(err)}\x1b[0m`);
if (reqSessionId) {
store.activeRequests[reqSessionId] = Math.max(0, (store.activeRequests[reqSessionId] || 1) - 1);
broadcastSessionStatus(reqSessionId);
}
if (!clientRes.headersSent) {
clientRes.writeHead(502, { 'Content-Type': 'application/json' });
}
clientRes.end(JSON.stringify({ error: 'proxy_error', message: err.message }));
});
if (isSSE) {
handleSSEResponse(ctx, proxyRes, clientRes);
} else {
handleNonSSEResponse(ctx, proxyRes, clientRes);
}
});

let reqErrorHandled = false;
proxyReq.on('error', (err) => {
reqErrorHandled = true;
const info = describeUpstreamError(err, upstream.host);

if (info.retryable && attempt === 0 && !clientRes.headersSent && !clientRes.destroyed) {
console.error(`\x1b[33m⏳ ${info.summary} — retrying…\x1b[0m`);
setTimeout(() => sendUpstream(1), 1000);
return;
}

// Late socket errors (EPIPE / ECONNRESET after the response has been received)
// are emitted on the underlying TLS/TCP socket and may not re-emit on the
// ClientRequest. Without a listener they crash the entire proxy process.
proxyReq.on('socket', (socket) => {
socket.on('error', (err) => {
console.error(`\x1b[31m❌ UPSTREAM SOCKET ERROR: ${err.code || err.message}\x1b[0m`);
const suffix = attempt > 0 ? ' — retry failed' : '';
console.error(`\x1b[31m❌ ${info.summary}${suffix}\x1b[0m`);
if (info.hint) console.error(`\x1b[31m → ${info.hint}\x1b[0m`);
if (reqSessionId) {
store.activeRequests[reqSessionId] = Math.max(0, (store.activeRequests[reqSessionId] || 1) - 1);
broadcastSessionStatus(reqSessionId);
}
if (!clientRes.headersSent) {
clientRes.writeHead(502, { 'Content-Type': 'application/json' });
}
clientRes.end(JSON.stringify({ error: 'proxy_error', message: err.message }));
});
});

proxyReq.end(bodyToSend);
// Late socket errors (EPIPE / ECONNRESET after response received) may not
// re-emit on the ClientRequest. Listener prevents uncaught-exception crash.
// Deferred check avoids duplicate logging when proxyReq 'error' already fired.
proxyReq.on('socket', (socket) => {
socket.on('error', (err) => {
setImmediate(() => {
if (!reqErrorHandled) {
console.error(`\x1b[31m❌ ${upstream.host}: socket error — ${err.code || err.message}\x1b[0m`);
}
});
});
});

proxyReq.end(bodyToSend);
}

sendUpstream(0);
}

function handleSSEResponse(ctx, proxyRes, clientRes) {
Expand Down Expand Up @@ -931,6 +991,7 @@ module.exports = {
applyModelPrefix,
stripInjectedStats,
buildEditSummary,
describeUpstreamError,
setStatusLineEnabled,
getStatusLineEnabled,
parseSSEFrame,
Expand Down
48 changes: 48 additions & 0 deletions test/forward-proxy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
resolveProxyAgent,
applyModelPrefix,
stripInjectedStats,
describeUpstreamError,
setStatusLineEnabled,
getStatusLineEnabled,
parseSSEFrame,
Expand Down Expand Up @@ -117,6 +118,53 @@ describe('stripInjectedStats', () => {
});
});

describe('describeUpstreamError', () => {
it('classifies ETIMEDOUT as retryable with hint', () => {
const info = describeUpstreamError({ code: 'ETIMEDOUT' }, 'api.anthropic.com');
assert.equal(info.retryable, true);
assert.match(info.summary, /api\.anthropic\.com.*timed out.*ETIMEDOUT/);
assert.ok(info.hint);
});

it('classifies ENOTFOUND as retryable with hint', () => {
const info = describeUpstreamError({ code: 'ENOTFOUND' }, 'api.anthropic.com');
assert.equal(info.retryable, true);
assert.match(info.summary, /DNS lookup failed/);
assert.ok(info.hint);
});

it('classifies EHOSTUNREACH as retryable', () => {
const info = describeUpstreamError({ code: 'EHOSTUNREACH' }, 'example.com');
assert.equal(info.retryable, true);
assert.match(info.summary, /unreachable/);
});

it('classifies ECONNRESET as non-retryable', () => {
const info = describeUpstreamError({ code: 'ECONNRESET' }, 'example.com');
assert.equal(info.retryable, false);
assert.match(info.summary, /reset/);
assert.equal(info.hint, null);
});

it('classifies EPIPE as non-retryable', () => {
const info = describeUpstreamError({ code: 'EPIPE' }, 'example.com');
assert.equal(info.retryable, false);
});

it('falls back to err.message for unknown codes', () => {
const info = describeUpstreamError({ code: 'EWHATEVER', message: 'something broke' }, 'host');
assert.equal(info.retryable, false);
assert.match(info.summary, /something broke/);
});

it('handles missing code gracefully', () => {
const info = describeUpstreamError({ message: 'oops' }, 'host');
assert.equal(info.retryable, false);
assert.match(info.summary, /oops/);
assert.equal(info.code, '');
});
});

describe('statusLineEnabled flag', () => {
beforeEach(() => setStatusLineEnabled(true));

Expand Down
Loading