diff --git a/index.js b/index.js index 3c905fb..c9c1918 100644 --- a/index.js +++ b/index.js @@ -487,6 +487,48 @@ export async function resolveModel(client, requestedModel, providerOverride) { throw new Error(`Unknown model '${requestedModel}'. Call GET /v1/models to inspect available IDs.`) } +export function createSseQueue() { + const chunks = [] + let resolve = null + let done = false + + function enqueue(value) { + chunks.push(value) + if (resolve) { + const r = resolve + resolve = null + r() + } + } + + function finish() { + done = true + if (resolve) { + const r = resolve + resolve = null + r() + } + } + + async function* generateChunks() { + while (true) { + while (chunks.length > 0) { + yield chunks.shift() + } + if (done) break + await new Promise((r) => { + resolve = r + }) + } + // Drain any remaining chunks + while (chunks.length > 0) { + yield chunks.shift() + } + } + + return { enqueue, finish, generateChunks } +} + function sseResponse(corsHeadersObj, generator) { const encoder = new TextEncoder() const body = new ReadableStream({ @@ -595,18 +637,7 @@ export function createProxyFetchHandler(client) { const completionID = `chatcmpl_${crypto.randomUUID().replace(/-/g, "")}` const now = Math.floor(Date.now() / 1000) - const chunks = [] - let resolve = null - let done = false - - function enqueue(value) { - chunks.push(value) - if (resolve) { - const r = resolve - resolve = null - r() - } - } + const queue = createSseQueue() async function* generateSse() { const runPromise = executePromptStreaming( @@ -622,7 +653,7 @@ export function createProxyFetchHandler(client) { model: model.id, choices: [{ index: 0, delta: { role: "assistant", content: delta }, finish_reason: null }], }) - enqueue(`data: ${chunk}\n\n`) + queue.enqueue(`data: ${chunk}\n\n`) }, ) .then((streamResult) => { @@ -638,7 +669,7 @@ export function createProxyFetchHandler(client) { total_tokens: streamResult.tokens.input + streamResult.tokens.output, }, }) - enqueue(`data: ${finalChunk}\n\ndata: [DONE]\n\n`) + queue.enqueue(`data: ${finalChunk}\n\ndata: [DONE]\n\n`) }) .catch(async (err) => { const streamError = err instanceof Error ? err.message : String(err) @@ -649,30 +680,13 @@ export function createProxyFetchHandler(client) { const errChunk = JSON.stringify({ error: { message: streamError, type: "server_error" }, }) - enqueue(`data: ${errChunk}\n\ndata: [DONE]\n\n`) + queue.enqueue(`data: ${errChunk}\n\ndata: [DONE]\n\n`) }) .finally(() => { - done = true - if (resolve) { - const r = resolve - resolve = null - r() - } + queue.finish() }) - while (true) { - while (chunks.length > 0) { - yield chunks.shift() - } - if (done) break - await new Promise((r) => { - resolve = r - }) - } - // Drain any remaining chunks - while (chunks.length > 0) { - yield chunks.shift() - } + yield* queue.generateChunks() await runPromise } @@ -739,25 +753,14 @@ export function createProxyFetchHandler(client) { const itemID = `msg_${crypto.randomUUID().replace(/-/g, "")}` const now = Math.floor(Date.now() / 1000) - const chunks = [] - let resolve = null - let done = false - - function enqueue(value) { - chunks.push(value) - if (resolve) { - const r = resolve - resolve = null - r() - } - } + const queue = createSseQueue() function sseEvent(eventType, data) { return `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n` } async function* generateSse() { - enqueue( + queue.enqueue( sseEvent("response.created", { type: "response.created", response: { @@ -770,7 +773,7 @@ export function createProxyFetchHandler(client) { }, }), ) - enqueue( + queue.enqueue( sseEvent("response.output_item.added", { type: "response.output_item.added", output_index: 0, @@ -786,7 +789,7 @@ export function createProxyFetchHandler(client) { system, (delta) => { if (partIndex === 0) { - enqueue( + queue.enqueue( sseEvent("response.content_part.added", { type: "response.content_part.added", item_id: itemID, @@ -797,7 +800,7 @@ export function createProxyFetchHandler(client) { ) partIndex++ } - enqueue( + queue.enqueue( sseEvent("response.output_text.delta", { type: "response.output_text.delta", item_id: itemID, @@ -809,7 +812,7 @@ export function createProxyFetchHandler(client) { }, ) .then((streamResult) => { - enqueue( + queue.enqueue( sseEvent("response.output_text.done", { type: "response.output_text.done", item_id: itemID, @@ -818,14 +821,14 @@ export function createProxyFetchHandler(client) { text: "", }), ) - enqueue( + queue.enqueue( sseEvent("response.output_item.done", { type: "response.output_item.done", output_index: 0, item: { id: itemID, type: "message", status: "completed", role: "assistant" }, }), ) - enqueue( + queue.enqueue( sseEvent("response.completed", { type: "response.completed", response: { @@ -849,7 +852,7 @@ export function createProxyFetchHandler(client) { error: errMsg, requestedModel: body.model, }) - enqueue( + queue.enqueue( sseEvent("response.failed", { type: "response.failed", response: { @@ -863,26 +866,10 @@ export function createProxyFetchHandler(client) { ) }) .finally(() => { - done = true - if (resolve) { - const r = resolve - resolve = null - r() - } + queue.finish() }) - while (true) { - while (chunks.length > 0) { - yield chunks.shift() - } - if (done) break - await new Promise((r) => { - resolve = r - }) - } - while (chunks.length > 0) { - yield chunks.shift() - } + yield* queue.generateChunks() await runPromise } diff --git a/index.test.js b/index.test.js index 264f53c..f26b033 100644 --- a/index.test.js +++ b/index.test.js @@ -3,6 +3,7 @@ import assert from "node:assert/strict" import { createProxyFetchHandler, + createSseQueue, toTextContent, normalizeMessages, normalizeResponseInput, @@ -762,3 +763,371 @@ describe("resolveModel", () => { assert.equal(model.modelID, "gpt-4o-mini") }) }) + +// --------------------------------------------------------------------------- +// Unit: createSseQueue +// --------------------------------------------------------------------------- +describe("createSseQueue", () => { + it("enqueue followed by generateChunks yields the value", async () => { + const queue = createSseQueue() + queue.enqueue("hello") + queue.finish() + const results = [] + for await (const chunk of queue.generateChunks()) { + results.push(chunk) + } + assert.deepEqual(results, ["hello"]) + }) + + it("multiple enqueues before finish yields all values in order", async () => { + const queue = createSseQueue() + queue.enqueue("a") + queue.enqueue("b") + queue.enqueue("c") + queue.finish() + const results = [] + for await (const chunk of queue.generateChunks()) { + results.push(chunk) + } + assert.deepEqual(results, ["a", "b", "c"]) + }) + + it("finish with no enqueues yields nothing", async () => { + const queue = createSseQueue() + queue.finish() + const results = [] + for await (const chunk of queue.generateChunks()) { + results.push(chunk) + } + assert.deepEqual(results, []) + }) + + it("enqueue after generateChunks starts still yields the value", async () => { + const queue = createSseQueue() + // Start consuming before anything is enqueued + const generatorPromise = (async () => { + const results = [] + for await (const chunk of queue.generateChunks()) { + results.push(chunk) + } + return results + })() + // Enqueue asynchronously + await Promise.resolve() + queue.enqueue("late") + queue.finish() + const results = await generatorPromise + assert.deepEqual(results, ["late"]) + }) +}) + +// --------------------------------------------------------------------------- +// Integration: GET /v1/models +// --------------------------------------------------------------------------- + +function createModelsClient(providers = []) { + return { + app: { log: async () => {} }, + config: { + providers: async () => ({ data: { providers } }), + }, + } +} + +test("GET /v1/models returns model list", async () => { + const client = createModelsClient([ + { + id: "openai", + models: { + "gpt-4o": { id: "gpt-4o", name: "GPT-4o" }, + "gpt-4o-mini": { id: "gpt-4o-mini", name: "GPT-4o Mini" }, + }, + }, + { + id: "anthropic", + models: { + "claude-3-5-sonnet": { id: "claude-3-5-sonnet", name: "Claude 3.5 Sonnet" }, + }, + }, + ]) + const handler = createProxyFetchHandler(client) + const request = new Request("http://127.0.0.1:4010/v1/models") + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 200) + assert.equal(body.object, "list") + assert.ok(Array.isArray(body.data)) + assert.equal(body.data.length, 3) + + const ids = body.data.map((m) => m.id) + assert.ok(ids.includes("openai/gpt-4o")) + assert.ok(ids.includes("openai/gpt-4o-mini")) + assert.ok(ids.includes("anthropic/claude-3-5-sonnet")) + + const first = body.data[0] + assert.equal(first.object, "model") + assert.ok("owned_by" in first) + assert.ok("created" in first) +}) + +test("GET /v1/models returns empty list when no providers configured", async () => { + const handler = createProxyFetchHandler(createModelsClient([])) + const request = new Request("http://127.0.0.1:4010/v1/models") + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 200) + assert.deepEqual(body, { object: "list", data: [] }) +}) + +test("GET /v1/models returns 500 when providers call throws", async () => { + const client = { + app: { log: async () => {} }, + config: { + providers: async () => { + throw new Error("upstream failure") + }, + }, + } + const handler = createProxyFetchHandler(client) + const request = new Request("http://127.0.0.1:4010/v1/models") + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 500) + assert.equal(body.error.type, "server_error") +}) + +// --------------------------------------------------------------------------- +// Integration: POST /v1/responses +// --------------------------------------------------------------------------- + +function createResponsesClient(responseContent = "The answer is 42.") { + return { + app: { log: async () => {} }, + tool: { ids: async () => ({ data: [] }) }, + config: { + providers: async () => ({ + data: { + providers: [ + { + id: "anthropic", + models: { "claude-3-5-sonnet": { id: "claude-3-5-sonnet", name: "Claude 3.5 Sonnet" } }, + }, + ], + }, + }), + }, + session: { + create: async () => ({ data: { id: "sess-resp-1" } }), + prompt: async () => ({ + data: { + parts: [{ type: "text", text: responseContent }], + info: { tokens: { input: 20, output: 8, reasoning: 0, cache: { read: 0, write: 0 } }, finish: "end_turn" }, + }, + }), + }, + } +} + +test("POST /v1/responses returns a well-formed response object", async () => { + const handler = createProxyFetchHandler(createResponsesClient("Hello from Claude.")) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "anthropic/claude-3-5-sonnet", + input: "Say hello.", + }), + }) + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 200) + assert.equal(body.object, "response") + assert.equal(body.status, "completed") + assert.ok(body.id.startsWith("resp_")) + assert.equal(body.output_text, "Hello from Claude.") + assert.ok(Array.isArray(body.output)) + assert.equal(body.output[0].role, "assistant") + assert.equal(body.usage.input_tokens, 20) + assert.equal(body.usage.output_tokens, 8) + assert.equal(body.usage.total_tokens, 28) +}) + +test("POST /v1/responses missing model returns 400", async () => { + const handler = createProxyFetchHandler(createResponsesClient()) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ input: "hi" }), + }) + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 400) + assert.ok(body.error.message.includes("model")) +}) + +test("POST /v1/responses empty input returns 400", async () => { + const handler = createProxyFetchHandler(createResponsesClient()) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ model: "anthropic/claude-3-5-sonnet", input: " " }), + }) + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 400) + assert.ok(body.error.message.includes("input")) +}) + +test("POST /v1/responses malformed JSON returns 400", async () => { + const handler = createProxyFetchHandler(createResponsesClient()) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: "{ bad json", + }) + + const response = await handler(request) + + assert.equal(response.status, 400) +}) + +test("POST /v1/responses unknown model returns 502", async () => { + const handler = createProxyFetchHandler(createModelsClient([])) // no providers + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ model: "nonexistent", input: "hi" }), + }) + + const response = await handler(request) + const body = await response.json() + + assert.equal(response.status, 502) + assert.ok(body.error.message.includes("nonexistent")) +}) + +test("POST /v1/responses instructions field is incorporated", async () => { + let capturedSystem = null + const client = { + app: { log: async () => {} }, + tool: { ids: async () => ({ data: [] }) }, + config: { + providers: async () => ({ + data: { + providers: [{ id: "anthropic", models: { "claude-3-5-sonnet": { id: "claude-3-5-sonnet" } } }], + }, + }), + }, + session: { + create: async () => ({ data: { id: "sess-instr" } }), + prompt: async ({ body }) => { + capturedSystem = body.system + return { + data: { + parts: [{ type: "text", text: "ok" }], + info: { tokens: { input: 1, output: 1, reasoning: 0, cache: { read: 0, write: 0 } }, finish: "end_turn" }, + }, + } + }, + }, + } + + const handler = createProxyFetchHandler(client) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "anthropic/claude-3-5-sonnet", + input: "What is 2+2?", + instructions: "You are a math tutor.", + }), + }) + + await handler(request) + assert.ok(capturedSystem?.includes("You are a math tutor.")) +}) + +test("POST /v1/responses stream: true returns SSE lifecycle events", async () => { + const events = [ + { + type: "message.part.updated", + properties: { + part: { sessionID: "sess-123", type: "text" }, + delta: "The answer", + }, + }, + { + type: "message.part.updated", + properties: { + part: { sessionID: "sess-123", type: "text" }, + delta: " is 42.", + }, + }, + { type: "session.idle", properties: { sessionID: "sess-123" } }, + ] + + const handler = createProxyFetchHandler(createStreamingClient(events)) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "gpt-4o", + stream: true, + input: "What is 6 times 7?", + }), + }) + + const response = await handler(request) + + assert.equal(response.status, 200) + assert.ok(response.headers.get("content-type")?.includes("text/event-stream")) + + const text = await response.text() + assert.ok(text.includes("response.created")) + assert.ok(text.includes("response.output_text.delta")) + assert.ok(text.includes("The answer")) + assert.ok(text.includes(" is 42.")) + assert.ok(text.includes("response.completed")) +}) + +test("POST /v1/responses stream: true with session.error emits response.failed", async () => { + const events = [ + { + type: "session.error", + properties: { + sessionID: "sess-123", + error: { message: "Rate limit exceeded" }, + }, + }, + { type: "session.idle", properties: { sessionID: "sess-123" } }, + ] + + const handler = createProxyFetchHandler(createStreamingClient(events)) + const request = new Request("http://127.0.0.1:4010/v1/responses", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ + model: "gpt-4o", + stream: true, + input: "hi", + }), + }) + + const response = await handler(request) + assert.equal(response.status, 200) + + const text = await response.text() + assert.ok(text.includes("response.failed") || text.includes("Rate limit exceeded")) +}) diff --git a/package.json b/package.json index 8a81f0b..3302a71 100644 --- a/package.json +++ b/package.json @@ -1,9 +1,12 @@ { "name": "opencode-llm-proxy", "version": "1.4.0", - "description": "OpenCode plugin that exposes an OpenAI-compatible HTTP proxy backed by your OpenCode providers", + "description": "OpenCode plugin that exposes an OpenAI-compatible HTTP proxy backed by any LLM provider configured in OpenCode", "main": "index.js", "type": "module", + "engines": { + "bun": ">=1.0.0" + }, "scripts": { "test": "node --test --experimental-test-coverage", "lint": "eslint .",