Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 18 additions & 82 deletions utils.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// utils.js
import { Readable } from 'stream';

export function processPath(originalPath) {
Expand All @@ -9,101 +8,33 @@ export function processPath(originalPath) {
return `/v1beta/${path}`;
}

export function getApiKeys(req) {
const keyParam = req.query.key || '';
if (!keyParam) return [];
return keyParam.split(';').filter(Boolean);
}

export async function handleSSEResponse(response, res, req) {
if (!response.body) {
throw new Error('Response body is undefined');
}

const stream = Readable.from(response.body);
let buffer = '';
let lastChunk = null;

const processChunk = (chunk) => {
try {
return JSON.parse(chunk);
} catch (e) {
return chunk;
}
};

const writeData = (data) => {
if (typeof data === 'object') {
res.write(`data: ${JSON.stringify(data)}\n\n`);
} else {
res.write(`data: ${data}\n\n`);
}
};

const handleLine = (line) => {
if (!line.startsWith('data: ')) return false;

const data = line.slice(6);
if (!data) return false;

if (data === '[DONE]') {
if (lastChunk) {
const currentContent = buffer;
const hasNewlineInPrevious = lastChunk.endsWith('\n');
const hasNewlineInCurrent = currentContent.startsWith('\n');

// 检查是否存在换行符并且内容重复
if ((hasNewlineInPrevious || hasNewlineInCurrent) &&
currentContent.length >= 3 &&
lastChunk.endsWith(currentContent.slice(0, -6))) {
stream.on('data', (chunk) => {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
res.write('data: [DONE]\n\n');
return true;
continue;
}

// 如果内容不重复,发送最后的数据
const processedData = processChunk(currentContent);
if (processedData && processedData !== '[DONE]') {
writeData(processedData);
try {
const parsedData = JSON.parse(data);
res.write(`data: ${JSON.stringify(parsedData)}\n\n`);
} catch (e) {
res.write(`data: ${data}\n\n`);
}
}
res.write('data: [DONE]\n\n');
return true;
}

if (lastChunk) {
const processedData = processChunk(lastChunk);
if (processedData) {
writeData(processedData);
}
}

lastChunk = data;
return false;
};

stream.on('data', (chunk) => {
const text = chunk.toString();
buffer += text;

// 按行分割处理数据
const lines = buffer.split('\n');
// 保留最后一个可能不完整的行
buffer = lines.pop() || '';

// 处理完整的行
for (const line of lines) {
if (handleLine(line.trim())) {
return;
}
}
});

stream.on('end', () => {
// 处理缓冲区中剩余的数据
if (buffer) {
const line = buffer.trim();
handleLine(line);
}
res.end();
});

Expand All @@ -112,8 +43,13 @@ export async function handleSSEResponse(response, res, req) {
res.end();
});

// 当请求被客户端终止时清理流
req.on('close', () => {
stream.destroy();
});
}

export function getApiKeys(req) {
const keyParam = req.query.key || '';
if (!keyParam) return [];
return keyParam.split(';').filter(Boolean);
}