From a4dc5a2795d8dd16d99ee6e414e5164ad5578320 Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 11:22:36 +0800 Subject: [PATCH 1/6] Update utils.js --- utils.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils.js b/utils.js index 2f1fda7..08fa576 100644 --- a/utils.js +++ b/utils.js @@ -54,7 +54,7 @@ export async function handleSSEResponse(response, res, req) { // 检查是否存在换行符并且内容重复 if ((hasNewlineInPrevious || hasNewlineInCurrent) && - currentContent.length < 1 && + currentContent.length > 5 && lastChunk.endsWith(currentContent.slice(0, -6))) { res.write('data: [DONE]\n\n'); return true; From e2d39a89d40103d992919ec3a9993cf24faecf53 Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 14:34:44 +0800 Subject: [PATCH 2/6] Update index.js --- index.js | 141 ++++++++++++++++++++++++++++++++----------------------- 1 file changed, 81 insertions(+), 60 deletions(-) diff --git a/index.js b/index.js index b52d7fb..1e36616 100644 --- a/index.js +++ b/index.js @@ -1,77 +1,98 @@ -import express from 'express'; -import fetch from 'node-fetch'; -import cors from 'cors'; -import dotenv from 'dotenv'; -import { processPath, handleSSEResponse, getApiKeys } from './utils.js'; +import { Readable } from 'stream'; -dotenv.config(); +export function processPath(originalPath) { + const path = originalPath.startsWith('/') ? originalPath.slice(1) : originalPath; + if (path.startsWith('v1beta/') || path === 'v1beta') { + return `/${path}`; + } + return `/v1beta/${path}`; +} -const app = express(); -const port = process.env.PORT || 3000; +export async function handleSSEResponse(response, res, req) { + if (!response.body) { + throw new Error('Response body is undefined'); + } -const TELEGRAPH_URL = 'https://generativelanguage.googleapis.com/v1beta'; + const stream = Readable.from(response.body); + let lastContent = ''; // 存储上一次的完整内容 -app.use(cors()); -app.use(express.json()); + stream.on('data', (chunk) => { + const lines = chunk.toString().split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + const data = line.slice(6); + if (data === '[DONE]') { + res.write('data: [DONE]\n\n'); + continue; + } -app.get('/health', (req, res) => { - res.status(200).json({ status: 'OK' }); -}); + try { + const currentData = JSON.parse(data); + const currentContent = getCurrentContent(currentData); -app.all('*', async (req, res) => { - try { - const processedPath = processPath(req.path); - const targetURL = new URL(TELEGRAPH_URL.replace(/\/v1beta$/, '') + processedPath); - - const apiKeys = getApiKeys(req); - if (apiKeys.length > 0) { - const selectedKey = apiKeys[Math.floor(Math.random() * apiKeys.length)]; - targetURL.searchParams.set('key', selectedKey); - } + // 如果是空内容,跳过 + if (!currentContent) { + continue; + } - for (const [key, value] of Object.entries(req.query)) { - if (key !== 'key') { - targetURL.searchParams.set(key, value); - } - } + // 检查当前内容是否是上一次内容末尾的重复 + if (lastContent && isEndingDuplicate(lastContent, currentContent)) { + console.log('Detected and skipped duplicate ending:', currentContent); + continue; + } - const fetchOptions = { - method: req.method, - headers: { - 'Content-Type': 'application/json', - }, - }; + // 不是末尾重复,更新lastContent并发送数据 + lastContent = currentContent; + res.write(`data: ${JSON.stringify(currentData)}\n\n`); - if (['POST', 'PUT', 'PATCH'].includes(req.method)) { - fetchOptions.body = JSON.stringify(req.body); + } catch (e) { + console.error('Error processing chunk:', e); + res.write(`data: ${data}\n\n`); + } + } } + }); - const isSSE = req.query.alt === 'sse'; + stream.on('end', () => { + res.end(); + }); - const response = await fetch(targetURL.toString(), fetchOptions); + stream.on('error', (error) => { + console.error('Stream processing error:', error); + res.end(); + }); - if (!response.ok) { - const error = await response.text(); - throw new Error(`HTTP error! status: ${response.status}, message: ${error}`); - } + req.on('close', () => { + stream.destroy(); + }); +} - if (isSSE) { - res.setHeader('Content-Type', 'text/event-stream'); - res.setHeader('Cache-Control', 'no-cache'); - res.setHeader('Connection', 'keep-alive'); - - await handleSSEResponse(response, res, req); - } else { - const data = await response.json(); - res.status(response.status).json(data); - } +// 从响应数据中提取实际内容 +function getCurrentContent(data) { + try { + return data.candidates[0].content.parts[0].text; + } catch (e) { + return null; + } +} - } catch (error) { - console.error('Proxy error:', error); - res.status(500).json({ error: 'Internal Server Error', details: error.message }); +// 检查是否是末尾重复内容 +function isEndingDuplicate(lastContent, currentContent) { + // 如果当前内容长度大于上次内容,显然不是重复 + if (currentContent.length > lastContent.length) { + return false; } -}); -app.listen(port, () => { - console.log(`Proxy server running on port ${port}`); -}); + // 检查当前内容是否出现在上次内容的末尾 + const endPosition = lastContent.length - currentContent.length; + const endingPart = lastContent.substring(endPosition); + + // 只有当前内容完全匹配上次内容的末尾部分时,才认为是重复 + return endingPart === currentContent; +} + +export function getApiKeys(req) { + const keyParam = req.query.key || ''; + if (!keyParam) return []; + return keyParam.split(';').filter(Boolean); +} From 29666799d96ff8e52ee1917b6f5955cc96f62bdb Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 14:43:35 +0800 Subject: [PATCH 3/6] Update .env From 235c93890f10d6c72a36a6dcdfbb0509a66d8a7a Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 14:43:57 +0800 Subject: [PATCH 4/6] Update package.json --- package.json | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 8ad1afa..74ed79d 100644 --- a/package.json +++ b/package.json @@ -1,21 +1,20 @@ { - "name": "google-ai-proxy", + "name": "gemini-proxy", "version": "1.0.0", - "description": "Proxy server for Google's Generative Language API", - "main": "index.js", "type": "module", + "main": "index.js", "scripts": { "start": "node index.js", "dev": "nodemon index.js" }, "dependencies": { - "express": "^4.18.2", - "node-fetch": "^3.3.2", "cors": "^2.8.5", - "dotenv": "^16.3.1" + "dotenv": "^16.0.3", + "express": "^4.18.2", + "node-fetch": "^3.3.0" }, "devDependencies": { - "nodemon": "^3.0.1" + "nodemon": "^2.0.22" }, "engines": { "node": ">=18.0.0" From 95984379e857ee9931a6026d4b8379a2983ecb81 Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 14:44:20 +0800 Subject: [PATCH 5/6] Update index.js --- index.js | 141 +++++++++++++++++++++++-------------------------------- 1 file changed, 60 insertions(+), 81 deletions(-) diff --git a/index.js b/index.js index 1e36616..b52d7fb 100644 --- a/index.js +++ b/index.js @@ -1,98 +1,77 @@ -import { Readable } from 'stream'; +import express from 'express'; +import fetch from 'node-fetch'; +import cors from 'cors'; +import dotenv from 'dotenv'; +import { processPath, handleSSEResponse, getApiKeys } from './utils.js'; -export function processPath(originalPath) { - const path = originalPath.startsWith('/') ? originalPath.slice(1) : originalPath; - if (path.startsWith('v1beta/') || path === 'v1beta') { - return `/${path}`; - } - return `/v1beta/${path}`; -} +dotenv.config(); -export async function handleSSEResponse(response, res, req) { - if (!response.body) { - throw new Error('Response body is undefined'); - } +const app = express(); +const port = process.env.PORT || 3000; - const stream = Readable.from(response.body); - let lastContent = ''; // 存储上一次的完整内容 +const TELEGRAPH_URL = 'https://generativelanguage.googleapis.com/v1beta'; - stream.on('data', (chunk) => { - const lines = chunk.toString().split('\n'); - for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.slice(6); - if (data === '[DONE]') { - res.write('data: [DONE]\n\n'); - continue; - } +app.use(cors()); +app.use(express.json()); - try { - const currentData = JSON.parse(data); - const currentContent = getCurrentContent(currentData); +app.get('/health', (req, res) => { + res.status(200).json({ status: 'OK' }); +}); - // 如果是空内容,跳过 - if (!currentContent) { - continue; - } +app.all('*', async (req, res) => { + try { + const processedPath = processPath(req.path); + const targetURL = new URL(TELEGRAPH_URL.replace(/\/v1beta$/, '') + processedPath); + + const apiKeys = getApiKeys(req); + if (apiKeys.length > 0) { + const selectedKey = apiKeys[Math.floor(Math.random() * apiKeys.length)]; + targetURL.searchParams.set('key', selectedKey); + } - // 检查当前内容是否是上一次内容末尾的重复 - if (lastContent && isEndingDuplicate(lastContent, currentContent)) { - console.log('Detected and skipped duplicate ending:', currentContent); - continue; - } + for (const [key, value] of Object.entries(req.query)) { + if (key !== 'key') { + targetURL.searchParams.set(key, value); + } + } - // 不是末尾重复,更新lastContent并发送数据 - lastContent = currentContent; - res.write(`data: ${JSON.stringify(currentData)}\n\n`); + const fetchOptions = { + method: req.method, + headers: { + 'Content-Type': 'application/json', + }, + }; - } catch (e) { - console.error('Error processing chunk:', e); - res.write(`data: ${data}\n\n`); - } - } + if (['POST', 'PUT', 'PATCH'].includes(req.method)) { + fetchOptions.body = JSON.stringify(req.body); } - }); - stream.on('end', () => { - res.end(); - }); + const isSSE = req.query.alt === 'sse'; - stream.on('error', (error) => { - console.error('Stream processing error:', error); - res.end(); - }); + const response = await fetch(targetURL.toString(), fetchOptions); - req.on('close', () => { - stream.destroy(); - }); -} + if (!response.ok) { + const error = await response.text(); + throw new Error(`HTTP error! status: ${response.status}, message: ${error}`); + } -// 从响应数据中提取实际内容 -function getCurrentContent(data) { - try { - return data.candidates[0].content.parts[0].text; - } catch (e) { - return null; - } -} + if (isSSE) { + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + + await handleSSEResponse(response, res, req); + } else { + const data = await response.json(); + res.status(response.status).json(data); + } -// 检查是否是末尾重复内容 -function isEndingDuplicate(lastContent, currentContent) { - // 如果当前内容长度大于上次内容,显然不是重复 - if (currentContent.length > lastContent.length) { - return false; + } catch (error) { + console.error('Proxy error:', error); + res.status(500).json({ error: 'Internal Server Error', details: error.message }); } +}); - // 检查当前内容是否出现在上次内容的末尾 - const endPosition = lastContent.length - currentContent.length; - const endingPart = lastContent.substring(endPosition); - - // 只有当前内容完全匹配上次内容的末尾部分时,才认为是重复 - return endingPart === currentContent; -} - -export function getApiKeys(req) { - const keyParam = req.query.key || ''; - if (!keyParam) return []; - return keyParam.split(';').filter(Boolean); -} +app.listen(port, () => { + console.log(`Proxy server running on port ${port}`); +}); From edab412ae8a18e133376de834f2bc2f5be1ec27a Mon Sep 17 00:00:00 2001 From: TAisBUG <139118923+TAisBUG@users.noreply.github.com> Date: Sat, 19 Oct 2024 14:44:40 +0800 Subject: [PATCH 6/6] Update utils.js --- utils.js | 141 +++++++++++++++++++++++++------------------------------ 1 file changed, 65 insertions(+), 76 deletions(-) diff --git a/utils.js b/utils.js index 08fa576..924a6b9 100644 --- a/utils.js +++ b/utils.js @@ -1,4 +1,3 @@ -// utils.js import { Readable } from 'stream'; export function processPath(originalPath) { @@ -9,100 +8,70 @@ export function processPath(originalPath) { return `/v1beta/${path}`; } -export function getApiKeys(req) { - const keyParam = req.query.key || ''; - if (!keyParam) return []; - return keyParam.split(';').filter(Boolean); -} - export async function handleSSEResponse(response, res, req) { if (!response.body) { throw new Error('Response body is undefined'); } const stream = Readable.from(response.body); - let buffer = ''; - let lastChunk = null; + let lastContent = null; // 用于存储上一次的内容 + let buffer = ''; // 用于处理跨块的数据 - const processChunk = (chunk) => { - try { - return JSON.parse(chunk); - } catch (e) { - return chunk; - } - }; - - const writeData = (data) => { - if (typeof data === 'object') { - res.write(`data: ${JSON.stringify(data)}\n\n`); - } else { - res.write(`data: ${data}\n\n`); - } - }; - - const handleLine = (line) => { - if (!line.startsWith('data: ')) return false; + stream.on('data', (chunk) => { + buffer += chunk.toString(); + const lines = buffer.split('\n'); - const data = line.slice(6); - if (!data) return false; - - if (data === '[DONE]') { - if (lastChunk) { - const currentContent = buffer; - const hasNewlineInPrevious = lastChunk.endsWith('\n'); - const hasNewlineInCurrent = currentContent.startsWith('\n'); + // 保留最后一行,因为它可能是不完整的 + buffer = lines.pop() || ''; - // 检查是否存在换行符并且内容重复 - if ((hasNewlineInPrevious || hasNewlineInCurrent) && - currentContent.length > 5 && - lastChunk.endsWith(currentContent.slice(0, -6))) { + for (const line of lines) { + if (line.startsWith('data: ')) { + const data = line.slice(6); + if (data === '[DONE]') { res.write('data: [DONE]\n\n'); - return true; + continue; } - // 如果内容不重复,发送最后的数据 - const processedData = processChunk(currentContent); - if (processedData && processedData !== '[DONE]') { - writeData(processedData); + try { + const parsedData = JSON.parse(data); + // 从响应中提取实际内容 + const content = extractContent(parsedData); + + // 检查是否是重复内容 + if (lastContent && isRepeatContent(content, lastContent)) { + continue; // 跳过重复内容 + } + + // 更新最后发送的内容 + lastContent = content; + + // 发送数据 + res.write(`data: ${JSON.stringify(parsedData)}\n\n`); + } catch (e) { + // 如果解析失败,仍然发送原始数据 + res.write(`data: ${data}\n\n`); } } - res.write('data: [DONE]\n\n'); - return true; - } - - if (lastChunk) { - const processedData = processChunk(lastChunk); - if (processedData) { - writeData(processedData); - } - } - - lastChunk = data; - return false; - }; - - stream.on('data', (chunk) => { - const text = chunk.toString(); - buffer += text; - - // 按行分割处理数据 - const lines = buffer.split('\n'); - // 保留最后一个可能不完整的行 - buffer = lines.pop() || ''; - - // 处理完整的行 - for (const line of lines) { - if (handleLine(line.trim())) { - return; - } } }); stream.on('end', () => { // 处理缓冲区中剩余的数据 if (buffer) { - const line = buffer.trim(); - handleLine(line); + if (buffer.startsWith('data: ')) { + const data = buffer.slice(6); + if (data !== '[DONE]') { + try { + const parsedData = JSON.parse(data); + const content = extractContent(parsedData); + if (!lastContent || !isRepeatContent(content, lastContent)) { + res.write(`data: ${JSON.stringify(parsedData)}\n\n`); + } + } catch (e) { + res.write(`data: ${data}\n\n`); + } + } + } } res.end(); }); @@ -112,8 +81,28 @@ export async function handleSSEResponse(response, res, req) { res.end(); }); - // 当请求被客户端终止时清理流 req.on('close', () => { stream.destroy(); }); } + +// 从响应数据中提取实际内容 +function extractContent(parsedData) { + try { + return parsedData.candidates?.[0]?.content?.parts?.[0]?.text || ''; + } catch (e) { + return JSON.stringify(parsedData); + } +} + +// 检查是否是重复内容 +function isRepeatContent(currentContent, lastContent) { + if (!currentContent || !lastContent) return false; + return lastContent.endsWith(currentContent); +} + +export function getApiKeys(req) { + const keyParam = req.query.key || ''; + if (!keyParam) return []; + return keyParam.split(';').filter(Boolean); +}