Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 109 additions & 10 deletions apps/api/src/routes/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,119 @@ async function handleChat(
messageId: string | undefined,
conversationService: ConversationService,
) {
const response = await conversationService.processMessage(
data.sessionId,
data.message,
);

const wsResponse: WebSocketResponse = {
type: "chat_response",
data: { message: response },
messageId,
const abortController = new AbortController();
let closed = false;
const onClose = () => {
closed = true;
try {
abortController.abort("ws closed");
} catch {}
};
webSocket.addEventListener("close", onClose);
webSocket.addEventListener("error", onClose);

webSocket.send(JSON.stringify(wsResponse));
const hardTimeout = setTimeout(() => {
try {
abortController.abort("stream timeout");
} catch {}
}, 60_000);

try {
// 1文字ドリッパー(供給は随時append、送信は一定間隔で1文字)
const dripper = createDripper(webSocket, messageId);

// 真のストリーミング: GeminiのSSEを受け取りつつドリッパーに供給
const streamed = await conversationService.processMessageStream(
data.sessionId,
data.message,
async (delta) => {
if (closed) return;
dripper.append(delta);
},
{
signal: abortController.signal as unknown as AbortSignal,
idleMs: 15000,
},
);

// 送信完了まで待つ
await dripper.finish();

if (!closed) {
const wsResponse: WebSocketResponse = {
type: "chat_response",
data: { message: streamed },
messageId,
};
webSocket.send(JSON.stringify(wsResponse));
}
} catch (err) {
if (!closed) {
const msg = (err as Error)?.message ?? "stream failed";
const wsResponse: WebSocketResponse = msg.includes(
"No response from Gemini API (stream)",
)
? {
type: "processing",
data: { delta: "" },
messageId,
}
: {
type: "error",
data: { error: msg },
messageId,
};
try {
webSocket.send(JSON.stringify(wsResponse));
} catch {}
}
} finally {
clearTimeout(hardTimeout);
webSocket.removeEventListener("close", onClose);
webSocket.removeEventListener("error", onClose);
}
}

function createDripper(webSocket: WebSocket, messageId?: string) {
let buffer = "";
let done = false;
let resolveDone: (() => void) | null = null;
const promise = new Promise<void>((resolve) => {
resolveDone = resolve;
});
const interval = setInterval(() => {
if (buffer.length > 0) {
const ch = buffer[0];
buffer = buffer.slice(1);
const streaming: WebSocketResponse = {
type: "processing",
data: { delta: ch },
messageId,
};
try {
webSocket.send(JSON.stringify(streaming));
} catch {}
return;
}
if (done) {
clearInterval(interval);
resolveDone?.();
}
}, 20);

return {
append(delta: string) {
buffer += delta;
},
async finish() {
done = true;
await promise;
},
};
}

// Note: sleep is no longer used; keep for potential future tweaks

async function handleSessionCreate(
webSocket: WebSocket,
messageId: string | undefined,
Expand Down
43 changes: 42 additions & 1 deletion apps/api/src/services/conversationsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ export class ConversationService {
.replace("{HISTORY}", historyText)
.replace("{USER_MESSAGE}", userMessage);

// 5. Gemini APIで応答生成
// 5. Gemini APIで応答生成(非ストリーム版)
let aiResponse: string;
try {
aiResponse = await this.geminiClient.generateContent(prompt);
Expand Down Expand Up @@ -291,6 +291,47 @@ export class ConversationService {
return this.saveMessage(sessionId, "ai", aiResponse);
}

// 真のストリーミング用(呼び出し側からonDeltaを注入)
public async processMessageStream(
sessionId: string,
userMessage: string,
onDelta: (delta: string) => void | Promise<void>,
opts?: { signal?: AbortSignal; idleMs?: number },
): Promise<ConversationMessage> {
// 1. ユーザーメッセージ保存
await this.saveMessage(sessionId, "user", userMessage);

// 2. セッションと履歴取得
const session = await this.getSession(sessionId);
if (!session) throw new Error("セッションが見つかりません");

const historyText = session.messages
.map((msg) => `${msg.role}: ${msg.content}`)
.join("\n");
const prompt = PROMPT_TEMPLATE.replace("{PHASE}", session.phase)
.replace("{HISTORY}", historyText)
.replace("{USER_MESSAGE}", userMessage);

let full = "";
try {
// 先に短い「応答準備中...」を送るとUXが向上
await onDelta("");
full = await this.geminiClient.streamContent(prompt, onDelta, opts);
} catch (error) {
if (
(error as Error).message.includes("quota") ||
(error as Error).message.includes("rate limit")
) {
full =
"申し訳ありません。現在APIの利用制限に達しています。しばらく時間をおいてから再度お試しください。";
} else {
throw error;
}
}

return this.saveMessage(sessionId, "ai", full);
}

// 要件定義書を生成するメソッド
public async generateRequirementsDocFromSession(
sessionId: string,
Expand Down
170 changes: 170 additions & 0 deletions apps/api/src/services/google/gemini-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,174 @@ export class GeminiAPIClient {

return data.candidates[0].content.parts[0].text;
}

// ストリーミングでテキストを受け取る
async streamContent(
prompt: string,
onDelta: (delta: string) => void | Promise<void>,
opts?: { signal?: AbortSignal; idleMs?: number },
): Promise<string> {
const model: string = "gemini-1.5-flash-latest";
const request: GeminiRequest = {
contents: [
{
parts: [{ text: prompt }],
},
],
};

const response = await fetch(
`${this.baseUrl}/${model}:streamGenerateContent?key=${this.apiKey}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
// 圧縮を避けることでストリームの中間フラッシュを有効にする
"Accept-Encoding": "identity",
Accept: "text/event-stream, application/json",
// Keep-Aliveヒント
Connection: "keep-alive",
},
body: JSON.stringify(request),
signal: opts?.signal,
},
);

if (!response.ok || !response.body) {
throw new Error(
`Gemini API stream error: ${response.status} ${response.statusText}`,
);
}

// Cloudflare WorkersのReadableStreamDefaultReaderはオプション非対応のため通常モード
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
let fullText = "";

// アイドルタイムアウト(データ無のまま一定時間で中断)
const idleMs = opts?.idleMs ?? 15000;
let idleTimer: number | undefined;
const resetTimer = () => {
if (idleTimer !== undefined) {
clearTimeout(idleTimer);
}
idleTimer = setTimeout(() => {
try {
opts?.signal?.throwIfAborted?.();
} catch {}
// AbortSignalに頼れない環境もあるのでreader.cancelを試みる
reader.cancel("idle timeout").catch(() => {});
}, idleMs) as unknown as number;
};

resetTimer();

while (true) {
const { done, value } = await reader.read();
if (done) break;
resetTimer();
buffer += decoder.decode(value, { stream: true });

// SSEは\n\nでイベント区切り、各行は 'data: ...'
let sepIndex: number = buffer.indexOf("\n\n");
let progressed = false;
while (sepIndex !== -1) {
progressed = true;
const rawEvent = buffer.slice(0, sepIndex);
buffer = buffer.slice(sepIndex + 2);

const lines = rawEvent.split("\n");
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data:")) continue;
const jsonStr = trimmed.slice(5).trim();
if (jsonStr === "[DONE]") continue;
try {
const obj = JSON.parse(jsonStr) as GeminiResponse;
const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? "";
if (text) {
fullText += text;
await onDelta(text);
}
} catch {
// JSONでなければ無視
}
}
// 次のイベント区切りを検索(必須)
sepIndex = buffer.indexOf("\n\n");
}

// JSONLフォーマット(1行1JSON)対応: SSEとして処理されなかった場合に試す
if (!progressed) {
const lastNl = buffer.lastIndexOf("\n");
if (lastNl !== -1) {
const chunk = buffer.slice(0, lastNl);
buffer = buffer.slice(lastNl + 1);
const lines = chunk.split("\n");
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed === "[DONE]") continue;
const jsonStr = trimmed.startsWith("data:")
? trimmed.slice(5).trim()
: trimmed;
try {
const obj = JSON.parse(jsonStr) as GeminiResponse;
const text =
obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? "";
if (text) {
fullText += text;
await onDelta(text);
}
} catch {
// ignore parse errors for partial lines
}
}
}
}
}

// 残りのバッファにイベントがあれば処理(SSE/JSONL両対応)
const tail = buffer.trim();
if (tail.startsWith("data:")) {
try {
const obj = JSON.parse(tail.slice(5).trim()) as GeminiResponse;
const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? "";
if (text) {
fullText += text;
await onDelta(text);
}
} catch {
// ignore
}
} else if (tail.startsWith("{")) {
try {
const obj = JSON.parse(tail) as GeminiResponse;
const text = obj?.candidates?.[0]?.content?.parts?.[0]?.text ?? "";
if (text) {
fullText += text;
await onDelta(text);
}
} catch {
// ignore
}
}

if (idleTimer !== undefined) clearTimeout(idleTimer);

// 何も受け取れなかった場合は非ストリームAPIでフォールバック
if (!fullText) {
const fallback = await this.generateContent(prompt);
// 疑似ストリーミングで最低限の体験を維持
const size = 60;
for (let i = 0; i < fallback.length; i += size) {
const delta = fallback.slice(i, i + size);
// eslint-disable-next-line no-await-in-loop
await onDelta(delta);
}
return fallback;
}

return fullText;
}
}
13 changes: 12 additions & 1 deletion apps/api/src/types/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ export interface ChatResponseData {
message: ConversationMessage;
}

// ストリーミング用の増分データ
export interface ProcessingData {
delta: string;
}

export interface GeneratedIssue {
title: string;
description: string;
Expand All @@ -85,6 +90,12 @@ export interface PongData {
// Websocket応答Message型
export interface WebSocketResponse {
type: "chat_response" | "session_created" | "error" | "pong" | "processing";
data: ChatResponseData | SessionCreatedData | ErrorData | PongData | null;
data:
| ChatResponseData
| SessionCreatedData
| ErrorData
| PongData
| ProcessingData
| null;
messageId?: string;
}
Loading