Skip to content

Commit 41b0572

Browse files
committed
feat: refactor SSE queue, expand test coverage, fix package metadata
- Extract createSseQueue() helper, eliminating duplicated SSE queue pattern in /v1/chat/completions and /v1/responses streaming branches (closes #34) - Add tests for GET /v1/models happy path, empty providers, and error path (closes #33) - Add tests for POST /v1/responses: happy path, validation, streaming, session.error (closes #32) - Fix package.json description to be provider-agnostic (closes #35) - Add engines field declaring bun >=1.0.0 requirement (closes #35) - Line coverage: 55% -> 89%, function coverage: 83% -> 94%
1 parent cf28bf1 commit 41b0572

3 files changed

Lines changed: 432 additions & 73 deletions

File tree

index.js

Lines changed: 59 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,48 @@ export async function resolveModel(client, requestedModel, providerOverride) {
487487
throw new Error(`Unknown model '${requestedModel}'. Call GET /v1/models to inspect available IDs.`)
488488
}
489489

490+
export function createSseQueue() {
491+
const chunks = []
492+
let resolve = null
493+
let done = false
494+
495+
function enqueue(value) {
496+
chunks.push(value)
497+
if (resolve) {
498+
const r = resolve
499+
resolve = null
500+
r()
501+
}
502+
}
503+
504+
function finish() {
505+
done = true
506+
if (resolve) {
507+
const r = resolve
508+
resolve = null
509+
r()
510+
}
511+
}
512+
513+
async function* generateChunks() {
514+
while (true) {
515+
while (chunks.length > 0) {
516+
yield chunks.shift()
517+
}
518+
if (done) break
519+
await new Promise((r) => {
520+
resolve = r
521+
})
522+
}
523+
// Drain any remaining chunks
524+
while (chunks.length > 0) {
525+
yield chunks.shift()
526+
}
527+
}
528+
529+
return { enqueue, finish, generateChunks }
530+
}
531+
490532
function sseResponse(corsHeadersObj, generator) {
491533
const encoder = new TextEncoder()
492534
const body = new ReadableStream({
@@ -595,18 +637,7 @@ export function createProxyFetchHandler(client) {
595637
const completionID = `chatcmpl_${crypto.randomUUID().replace(/-/g, "")}`
596638
const now = Math.floor(Date.now() / 1000)
597639

598-
const chunks = []
599-
let resolve = null
600-
let done = false
601-
602-
function enqueue(value) {
603-
chunks.push(value)
604-
if (resolve) {
605-
const r = resolve
606-
resolve = null
607-
r()
608-
}
609-
}
640+
const queue = createSseQueue()
610641

611642
async function* generateSse() {
612643
const runPromise = executePromptStreaming(
@@ -622,7 +653,7 @@ export function createProxyFetchHandler(client) {
622653
model: model.id,
623654
choices: [{ index: 0, delta: { role: "assistant", content: delta }, finish_reason: null }],
624655
})
625-
enqueue(`data: ${chunk}\n\n`)
656+
queue.enqueue(`data: ${chunk}\n\n`)
626657
},
627658
)
628659
.then((streamResult) => {
@@ -638,7 +669,7 @@ export function createProxyFetchHandler(client) {
638669
total_tokens: streamResult.tokens.input + streamResult.tokens.output,
639670
},
640671
})
641-
enqueue(`data: ${finalChunk}\n\ndata: [DONE]\n\n`)
672+
queue.enqueue(`data: ${finalChunk}\n\ndata: [DONE]\n\n`)
642673
})
643674
.catch(async (err) => {
644675
const streamError = err instanceof Error ? err.message : String(err)
@@ -649,30 +680,13 @@ export function createProxyFetchHandler(client) {
649680
const errChunk = JSON.stringify({
650681
error: { message: streamError, type: "server_error" },
651682
})
652-
enqueue(`data: ${errChunk}\n\ndata: [DONE]\n\n`)
683+
queue.enqueue(`data: ${errChunk}\n\ndata: [DONE]\n\n`)
653684
})
654685
.finally(() => {
655-
done = true
656-
if (resolve) {
657-
const r = resolve
658-
resolve = null
659-
r()
660-
}
686+
queue.finish()
661687
})
662688

663-
while (true) {
664-
while (chunks.length > 0) {
665-
yield chunks.shift()
666-
}
667-
if (done) break
668-
await new Promise((r) => {
669-
resolve = r
670-
})
671-
}
672-
// Drain any remaining chunks
673-
while (chunks.length > 0) {
674-
yield chunks.shift()
675-
}
689+
yield* queue.generateChunks()
676690

677691
await runPromise
678692
}
@@ -739,25 +753,14 @@ export function createProxyFetchHandler(client) {
739753
const itemID = `msg_${crypto.randomUUID().replace(/-/g, "")}`
740754
const now = Math.floor(Date.now() / 1000)
741755

742-
const chunks = []
743-
let resolve = null
744-
let done = false
745-
746-
function enqueue(value) {
747-
chunks.push(value)
748-
if (resolve) {
749-
const r = resolve
750-
resolve = null
751-
r()
752-
}
753-
}
756+
const queue = createSseQueue()
754757

755758
function sseEvent(eventType, data) {
756759
return `event: ${eventType}\ndata: ${JSON.stringify(data)}\n\n`
757760
}
758761

759762
async function* generateSse() {
760-
enqueue(
763+
queue.enqueue(
761764
sseEvent("response.created", {
762765
type: "response.created",
763766
response: {
@@ -770,7 +773,7 @@ export function createProxyFetchHandler(client) {
770773
},
771774
}),
772775
)
773-
enqueue(
776+
queue.enqueue(
774777
sseEvent("response.output_item.added", {
775778
type: "response.output_item.added",
776779
output_index: 0,
@@ -786,7 +789,7 @@ export function createProxyFetchHandler(client) {
786789
system,
787790
(delta) => {
788791
if (partIndex === 0) {
789-
enqueue(
792+
queue.enqueue(
790793
sseEvent("response.content_part.added", {
791794
type: "response.content_part.added",
792795
item_id: itemID,
@@ -797,7 +800,7 @@ export function createProxyFetchHandler(client) {
797800
)
798801
partIndex++
799802
}
800-
enqueue(
803+
queue.enqueue(
801804
sseEvent("response.output_text.delta", {
802805
type: "response.output_text.delta",
803806
item_id: itemID,
@@ -809,7 +812,7 @@ export function createProxyFetchHandler(client) {
809812
},
810813
)
811814
.then((streamResult) => {
812-
enqueue(
815+
queue.enqueue(
813816
sseEvent("response.output_text.done", {
814817
type: "response.output_text.done",
815818
item_id: itemID,
@@ -818,14 +821,14 @@ export function createProxyFetchHandler(client) {
818821
text: "",
819822
}),
820823
)
821-
enqueue(
824+
queue.enqueue(
822825
sseEvent("response.output_item.done", {
823826
type: "response.output_item.done",
824827
output_index: 0,
825828
item: { id: itemID, type: "message", status: "completed", role: "assistant" },
826829
}),
827830
)
828-
enqueue(
831+
queue.enqueue(
829832
sseEvent("response.completed", {
830833
type: "response.completed",
831834
response: {
@@ -849,7 +852,7 @@ export function createProxyFetchHandler(client) {
849852
error: errMsg,
850853
requestedModel: body.model,
851854
})
852-
enqueue(
855+
queue.enqueue(
853856
sseEvent("response.failed", {
854857
type: "response.failed",
855858
response: {
@@ -863,26 +866,10 @@ export function createProxyFetchHandler(client) {
863866
)
864867
})
865868
.finally(() => {
866-
done = true
867-
if (resolve) {
868-
const r = resolve
869-
resolve = null
870-
r()
871-
}
869+
queue.finish()
872870
})
873871

874-
while (true) {
875-
while (chunks.length > 0) {
876-
yield chunks.shift()
877-
}
878-
if (done) break
879-
await new Promise((r) => {
880-
resolve = r
881-
})
882-
}
883-
while (chunks.length > 0) {
884-
yield chunks.shift()
885-
}
872+
yield* queue.generateChunks()
886873

887874
await runPromise
888875
}

0 commit comments

Comments
 (0)