From a83b0b1dc7e527582fce7ed878c692bbcd0d0f70 Mon Sep 17 00:00:00 2001 From: nka21 Date: Sun, 10 Aug 2025 13:45:05 +0900 Subject: [PATCH] =?UTF-8?q?=E5=8B=95=E3=81=84=E3=81=A6=E3=81=AA=E3=81=84?= =?UTF-8?q?=E3=82=84=E3=81=A4...?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/api/src/routes/websocket.ts | 119 ++++++++++-- apps/api/src/services/conversationsService.ts | 43 ++++- apps/api/src/services/google/gemini-api.ts | 170 ++++++++++++++++++ apps/api/src/types/definitions.ts | 13 +- apps/extension/src/components/ChatArea.tsx | 7 +- .../src/components/ModelResponse.tsx | 13 +- apps/extension/src/globals.css | 20 +++ apps/extension/src/store/chatStore.ts | 169 +++++++++++++++-- 8 files changed, 525 insertions(+), 29 deletions(-) diff --git a/apps/api/src/routes/websocket.ts b/apps/api/src/routes/websocket.ts index 8b764bb..1923711 100644 --- a/apps/api/src/routes/websocket.ts +++ b/apps/api/src/routes/websocket.ts @@ -128,20 +128,119 @@ async function handleChat( messageId: string | undefined, conversationService: ConversationService, ) { - const response = await conversationService.processMessage( - data.sessionId, - data.message, - ); - - const wsResponse: WebSocketResponse = { - type: "chat_response", - data: { message: response }, - messageId, + const abortController = new AbortController(); + let closed = false; + const onClose = () => { + closed = true; + try { + abortController.abort("ws closed"); + } catch {} }; + webSocket.addEventListener("close", onClose); + webSocket.addEventListener("error", onClose); - webSocket.send(JSON.stringify(wsResponse)); + const hardTimeout = setTimeout(() => { + try { + abortController.abort("stream timeout"); + } catch {} + }, 60_000); + + try { + // 1文字ドリッパー(供給は随時append、送信は一定間隔で1文字) + const dripper = createDripper(webSocket, messageId); + + // 真のストリーミング: GeminiのSSEを受け取りつつドリッパーに供給 + const streamed = await conversationService.processMessageStream( + data.sessionId, + data.message, + async (delta) => { + if (closed) return; + dripper.append(delta); + }, + { + signal: abortController.signal as unknown as AbortSignal, + idleMs: 15000, + }, + ); + + // 送信完了まで待つ + await dripper.finish(); + + if (!closed) { + const wsResponse: WebSocketResponse = { + type: "chat_response", + data: { message: streamed }, + messageId, + }; + webSocket.send(JSON.stringify(wsResponse)); + } + } catch (err) { + if (!closed) { + const msg = (err as Error)?.message ?? "stream failed"; + const wsResponse: WebSocketResponse = msg.includes( + "No response from Gemini API (stream)", + ) + ? { + type: "processing", + data: { delta: "" }, + messageId, + } + : { + type: "error", + data: { error: msg }, + messageId, + }; + try { + webSocket.send(JSON.stringify(wsResponse)); + } catch {} + } + } finally { + clearTimeout(hardTimeout); + webSocket.removeEventListener("close", onClose); + webSocket.removeEventListener("error", onClose); + } } +function createDripper(webSocket: WebSocket, messageId?: string) { + let buffer = ""; + let done = false; + let resolveDone: (() => void) | null = null; + const promise = new Promise((resolve) => { + resolveDone = resolve; + }); + const interval = setInterval(() => { + if (buffer.length > 0) { + const ch = buffer[0]; + buffer = buffer.slice(1); + const streaming: WebSocketResponse = { + type: "processing", + data: { delta: ch }, + messageId, + }; + try { + webSocket.send(JSON.stringify(streaming)); + } catch {} + return; + } + if (done) { + clearInterval(interval); + resolveDone?.(); + } + }, 20); + + return { + append(delta: string) { + buffer += delta; + }, + async finish() { + done = true; + await promise; + }, + }; +} + +// Note: sleep is no longer used; keep for potential future tweaks + async function handleSessionCreate( webSocket: WebSocket, messageId: string | undefined, diff --git a/apps/api/src/services/conversationsService.ts b/apps/api/src/services/conversationsService.ts index 48667d8..8023e7b 100644 --- a/apps/api/src/services/conversationsService.ts +++ b/apps/api/src/services/conversationsService.ts @@ -244,7 +244,7 @@ export class ConversationService { .replace("{HISTORY}", historyText) .replace("{USER_MESSAGE}", userMessage); - // 5. Gemini APIで応答生成 + // 5. Gemini APIで応答生成(非ストリーム版) let aiResponse: string; try { aiResponse = await this.geminiClient.generateContent(prompt); @@ -291,6 +291,47 @@ export class ConversationService { return this.saveMessage(sessionId, "ai", aiResponse); } + // 真のストリーミング用(呼び出し側からonDeltaを注入) + public async processMessageStream( + sessionId: string, + userMessage: string, + onDelta: (delta: string) => void | Promise, + opts?: { signal?: AbortSignal; idleMs?: number }, + ): Promise { + // 1. ユーザーメッセージ保存 + await this.saveMessage(sessionId, "user", userMessage); + + // 2. セッションと履歴取得 + const session = await this.getSession(sessionId); + if (!session) throw new Error("セッションが見つかりません"); + + const historyText = session.messages + .map((msg) => `${msg.role}: ${msg.content}`) + .join("\n"); + const prompt = PROMPT_TEMPLATE.replace("{PHASE}", session.phase) + .replace("{HISTORY}", historyText) + .replace("{USER_MESSAGE}", userMessage); + + let full = ""; + try { + // 先に短い「応答準備中...」を送るとUXが向上 + await onDelta(""); + full = await this.geminiClient.streamContent(prompt, onDelta, opts); + } catch (error) { + if ( + (error as Error).message.includes("quota") || + (error as Error).message.includes("rate limit") + ) { + full = + "申し訳ありません。現在APIの利用制限に達しています。しばらく時間をおいてから再度お試しください。"; + } else { + throw error; + } + } + + return this.saveMessage(sessionId, "ai", full); + } + // 要件定義書を生成するメソッド public async generateRequirementsDocFromSession( sessionId: string, diff --git a/apps/api/src/services/google/gemini-api.ts b/apps/api/src/services/google/gemini-api.ts index 1147cb1..6dce5dc 100644 --- a/apps/api/src/services/google/gemini-api.ts +++ b/apps/api/src/services/google/gemini-api.ts @@ -72,4 +72,174 @@ export class GeminiAPIClient { return data.candidates[0].content.parts[0].text; } + + // ストリーミングでテキストを受け取る + async streamContent( + prompt: string, + onDelta: (delta: string) => void | Promise, + opts?: { signal?: AbortSignal; idleMs?: number }, + ): Promise { + const model: string = "gemini-1.5-flash-latest"; + const request: GeminiRequest = { + contents: [ + { + parts: [{ text: prompt }], + }, + ], + }; + + const response = await fetch( + `${this.baseUrl}/${model}:streamGenerateContent?key=${this.apiKey}`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + // 圧縮を避けることでストリームの中間フラッシュを有効にする + "Accept-Encoding": "identity", + Accept: "text/event-stream, application/json", + // Keep-Aliveヒント + Connection: "keep-alive", + }, + body: JSON.stringify(request), + signal: opts?.signal, + }, + ); + + if (!response.ok || !response.body) { + throw new Error( + `Gemini API stream error: ${response.status} ${response.statusText}`, + ); + } + + // Cloudflare WorkersのReadableStreamDefaultReaderはオプション非対応のため通常モード + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + let fullText = ""; + + // アイドルタイムアウト(データ無のまま一定時間で中断) + const idleMs = opts?.idleMs ?? 15000; + let idleTimer: number | undefined; + const resetTimer = () => { + if (idleTimer !== undefined) { + clearTimeout(idleTimer); + } + idleTimer = setTimeout(() => { + try { + opts?.signal?.throwIfAborted?.(); + } catch {} + // AbortSignalに頼れない環境もあるのでreader.cancelを試みる + reader.cancel("idle timeout").catch(() => {}); + }, idleMs) as unknown as number; + }; + + resetTimer(); + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + resetTimer(); + buffer += decoder.decode(value, { stream: true }); + + // SSEは\n\nでイベント区切り、各行は 'data: ...' + let sepIndex: number = buffer.indexOf("\n\n"); + let progressed = false; + while (sepIndex !== -1) { + progressed = true; + const rawEvent = buffer.slice(0, sepIndex); + buffer = buffer.slice(sepIndex + 2); + + const lines = rawEvent.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || !trimmed.startsWith("data:")) continue; + const jsonStr = trimmed.slice(5).trim(); + if (jsonStr === "[DONE]") continue; + try { + const obj = JSON.parse(jsonStr) as GeminiResponse; + const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? ""; + if (text) { + fullText += text; + await onDelta(text); + } + } catch { + // JSONでなければ無視 + } + } + // 次のイベント区切りを検索(必須) + sepIndex = buffer.indexOf("\n\n"); + } + + // JSONLフォーマット(1行1JSON)対応: SSEとして処理されなかった場合に試す + if (!progressed) { + const lastNl = buffer.lastIndexOf("\n"); + if (lastNl !== -1) { + const chunk = buffer.slice(0, lastNl); + buffer = buffer.slice(lastNl + 1); + const lines = chunk.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed || trimmed === "[DONE]") continue; + const jsonStr = trimmed.startsWith("data:") + ? trimmed.slice(5).trim() + : trimmed; + try { + const obj = JSON.parse(jsonStr) as GeminiResponse; + const text = + obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? ""; + if (text) { + fullText += text; + await onDelta(text); + } + } catch { + // ignore parse errors for partial lines + } + } + } + } + } + + // 残りのバッファにイベントがあれば処理(SSE/JSONL両対応) + const tail = buffer.trim(); + if (tail.startsWith("data:")) { + try { + const obj = JSON.parse(tail.slice(5).trim()) as GeminiResponse; + const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? ""; + if (text) { + fullText += text; + await onDelta(text); + } + } catch { + // ignore + } + } else if (tail.startsWith("{")) { + try { + const obj = JSON.parse(tail) as GeminiResponse; + const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? ""; + if (text) { + fullText += text; + await onDelta(text); + } + } catch { + // ignore + } + } + + if (idleTimer !== undefined) clearTimeout(idleTimer); + + // 何も受け取れなかった場合は非ストリームAPIでフォールバック + if (!fullText) { + const fallback = await this.generateContent(prompt); + // 疑似ストリーミングで最低限の体験を維持 + const size = 60; + for (let i = 0; i < fallback.length; i += size) { + const delta = fallback.slice(i, i + size); + // eslint-disable-next-line no-await-in-loop + await onDelta(delta); + } + return fallback; + } + + return fullText; + } } diff --git a/apps/api/src/types/definitions.ts b/apps/api/src/types/definitions.ts index a0d622d..6a7dc5b 100644 --- a/apps/api/src/types/definitions.ts +++ b/apps/api/src/types/definitions.ts @@ -63,6 +63,11 @@ export interface ChatResponseData { message: ConversationMessage; } +// ストリーミング用の増分データ +export interface ProcessingData { + delta: string; +} + export interface GeneratedIssue { title: string; description: string; @@ -85,6 +90,12 @@ export interface PongData { // Websocket応答Message型 export interface WebSocketResponse { type: "chat_response" | "session_created" | "error" | "pong" | "processing"; - data: ChatResponseData | SessionCreatedData | ErrorData | PongData | null; + data: + | ChatResponseData + | SessionCreatedData + | ErrorData + | PongData + | ProcessingData + | null; messageId?: string; } diff --git a/apps/extension/src/components/ChatArea.tsx b/apps/extension/src/components/ChatArea.tsx index 1785565..1c82240 100644 --- a/apps/extension/src/components/ChatArea.tsx +++ b/apps/extension/src/components/ChatArea.tsx @@ -5,6 +5,9 @@ import { useChat } from "@/hooks/useChat"; export const ChatArea = () => { const { messages, isLoading } = useChat(); + const hasStreaming = messages.some( + (m) => m.role === "ai" && m.id.startsWith("stream:"), + ); useEffect(() => { console.log(messages); @@ -17,10 +20,10 @@ export const ChatArea = () => { m.role === "user" ? ( ) : ( - + ), )} - {isLoading && ( + {isLoading && !hasStreaming && (
diff --git a/apps/extension/src/components/ModelResponse.tsx b/apps/extension/src/components/ModelResponse.tsx index 3192a98..8110cec 100644 --- a/apps/extension/src/components/ModelResponse.tsx +++ b/apps/extension/src/components/ModelResponse.tsx @@ -7,11 +7,12 @@ import { useChatStore } from "@/store/chatStore"; import type { Repository } from "@/hooks/api/getRepos"; interface ModelResponseProps { + id?: string; content: string; } export const ModelResponse = memo((props: ModelResponseProps) => { - const { content } = props; + const { id, content } = props; const [showRepositorySelection, setShowRepositorySelection] = useState(false); const [isCreatingIssues, setIsCreatingIssues] = useState(false); @@ -72,7 +73,15 @@ export const ModelResponse = memo((props: ModelResponseProps) => {
prism
-
{displayContent}
+
+ {id?.startsWith("stream:") ? "" : displayContent} +
{shouldShowRepositorySelection && (