From e570c8f022278e77102c3da2c565a25905a051d9 Mon Sep 17 00:00:00 2001 From: anandgupta42 <93243293+anandgupta42@users.noreply.github.com> Date: Sun, 22 Mar 2026 07:49:43 -0700 Subject: [PATCH] test: AsyncQueue/work utility and State.invalidate coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add unit tests for AsyncQueue and work() concurrency utility (queue.ts) and State.invalidate (altimate_change in state.ts) — both were completely untested. These tests mitigate risk of race conditions in streaming and stale state after config invalidation. Co-Authored-By: Claude Opus 4.6 (1M context) https://claude.ai/code/session_01AkMKqcoyJ1vURZ4crtpEZu --- packages/opencode/test/project/state.test.ts | 29 +++++ packages/opencode/test/util/queue.test.ts | 108 +++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 packages/opencode/test/util/queue.test.ts diff --git a/packages/opencode/test/project/state.test.ts b/packages/opencode/test/project/state.test.ts index c1a6dab315..7a4a7e60e9 100644 --- a/packages/opencode/test/project/state.test.ts +++ b/packages/opencode/test/project/state.test.ts @@ -1,6 +1,7 @@ import { afterEach, expect, test } from "bun:test" import { Instance } from "../../src/project/instance" +import { State } from "../../src/project/state" import { tmpdir } from "../fixture/fixture" afterEach(async () => { @@ -113,3 +114,31 @@ test("Instance.state dedupes concurrent promise initialization", async () => { expect(a).toBe(b) expect(n).toBe(1) }) + +test("State.invalidate removes cached entry for re-initialization", async () => { + await using tmp = await tmpdir() + let n = 0 + const init = () => ({ n: ++n }) + const state = Instance.state(init) + + const a = await Instance.provide({ + directory: tmp.path, + fn: async () => state(), + }) + expect(a.n).toBe(1) + + // Invalidate the cached entry so next access re-initializes + State.invalidate(tmp.path, init) + + const b = await Instance.provide({ + directory: tmp.path, + fn: async () => state(), + }) + expect(b.n).toBe(2) + expect(a).not.toBe(b) +}) + +test("State.invalidate on nonexistent key is a no-op", () => { + // Should not throw + State.invalidate("/nonexistent/path", () => {}) +}) diff --git a/packages/opencode/test/util/queue.test.ts b/packages/opencode/test/util/queue.test.ts new file mode 100644 index 0000000000..fd237adca6 --- /dev/null +++ b/packages/opencode/test/util/queue.test.ts @@ -0,0 +1,108 @@ +import { describe, test, expect } from "bun:test" +import { AsyncQueue, work } from "../../src/util/queue" + +describe("AsyncQueue", () => { + test("push before next resolves immediately", async () => { + const q = new AsyncQueue() + q.push(1) + q.push(2) + const a = await q.next() + const b = await q.next() + expect(a).toBe(1) + expect(b).toBe(2) + }) + + test("next before push waits for value", async () => { + const q = new AsyncQueue() + const promise = q.next() + q.push("hello") + expect(await promise).toBe("hello") + }) + + test("multiple waiters resolve in order", async () => { + const q = new AsyncQueue() + const p1 = q.next() + const p2 = q.next() + q.push(10) + q.push(20) + expect(await p1).toBe(10) + expect(await p2).toBe(20) + }) + + test("async iterator yields pushed values", async () => { + const q = new AsyncQueue() + const collected: number[] = [] + + q.push(1) + q.push(2) + q.push(3) + + let count = 0 + for await (const val of q) { + collected.push(val) + count++ + if (count === 3) break + } + expect(collected).toEqual([1, 2, 3]) + }) + + test("interleaved push and next", async () => { + const q = new AsyncQueue() + q.push(1) + expect(await q.next()).toBe(1) + const p = q.next() // waiting + q.push(2) + expect(await p).toBe(2) + q.push(3) + q.push(4) + expect(await q.next()).toBe(3) + expect(await q.next()).toBe(4) + }) +}) + +describe("work", () => { + test("processes all items", async () => { + const results: number[] = [] + await work(2, [1, 2, 3, 4, 5], async (item) => { + results.push(item) + }) + expect(results.sort()).toEqual([1, 2, 3, 4, 5]) + }) + + test("respects concurrency limit", async () => { + let active = 0 + let maxActive = 0 + await work(2, [1, 2, 3, 4, 5], async () => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(10) + active-- + }) + expect(maxActive).toBeLessThanOrEqual(2) + }) + + test("handles empty items array", async () => { + let called = false + await work(3, [], async () => { + called = true + }) + expect(called).toBe(false) + }) + + test("concurrency of 1 processes sequentially (LIFO due to pop)", async () => { + const order: number[] = [] + await work(1, [1, 2, 3], async (item) => { + order.push(item) + }) + // work() uses pending.pop(), so items are processed in reverse order + expect(order).toEqual([3, 2, 1]) + }) + + test("propagates errors from worker", async () => { + await expect( + work(2, [1, 2, 3], async (item) => { + if (item === 2) throw new Error("boom") + }), + ).rejects.toThrow("boom") + }) +})