From a49d7a7010e3c362a381c2ae08db0a40cdffafba Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:02:37 -0700 Subject: [PATCH 01/15] fix(client): release idle VCS subscriptions immediately --- packages/client-runtime/src/state/runtime.test.ts | 14 ++++++++++++++ packages/client-runtime/src/state/vcs.ts | 1 + 2 files changed, 15 insertions(+) diff --git a/packages/client-runtime/src/state/runtime.test.ts b/packages/client-runtime/src/state/runtime.test.ts index 7584e55d52e..9599b44c3c9 100644 --- a/packages/client-runtime/src/state/runtime.test.ts +++ b/packages/client-runtime/src/state/runtime.test.ts @@ -9,6 +9,7 @@ import * as Layer from "effect/Layer"; import * as Stream from "effect/Stream"; import { AsyncResult, Atom, AtomRegistry } from "effect/unstable/reactivity"; +import { EnvironmentRegistry } from "../connection/registry.ts"; import { environmentRpcKey, createAtomCommandScheduler, @@ -22,6 +23,19 @@ import { settlePromise, squashAtomCommandFailure, } from "./runtime.ts"; +import { createVcsEnvironmentAtoms } from "./vcs.ts"; + +describe("VCS status subscription lifecycle", () => { + it("removes the status subscription immediately after its last consumer unmounts", () => { + const runtime = Atom.runtime(Layer.effect(EnvironmentRegistry, Effect.never)); + const statusAtom = createVcsEnvironmentAtoms(runtime).status({ + environmentId: EnvironmentId.make("env-1"), + input: { cwd: "/repo" }, + }); + + expect(statusAtom.idleTTL).toBe(0); + }); +}); describe("settleAsyncResult", () => { it("preserves successful values and typed failures", async () => { diff --git a/packages/client-runtime/src/state/vcs.ts b/packages/client-runtime/src/state/vcs.ts index 846d0d50609..3231c29d4b5 100644 --- a/packages/client-runtime/src/state/vcs.ts +++ b/packages/client-runtime/src/state/vcs.ts @@ -23,6 +23,7 @@ export function createVcsEnvironmentAtoms( }), status: createEnvironmentSubscriptionAtomFamily(runtime, { label: "environment-data:vcs:status", + idleTtlMs: 0, subscribe: (input: EnvironmentRpcInput) => subscribe(WS_METHODS.subscribeVcsStatus, input).pipe( Stream.mapAccum( From 1d0fc944a270c9245d6bd3f03386ee634918d421 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:06:06 -0700 Subject: [PATCH 02/15] fix(cli): force exit when graceful shutdown stalls --- apps/server/src/bin.ts | 4 + .../src/cli/shutdownSignalEscalation.test.ts | 111 ++++++++++++++++++ .../src/cli/shutdownSignalEscalation.ts | 94 +++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 apps/server/src/cli/shutdownSignalEscalation.test.ts create mode 100644 apps/server/src/cli/shutdownSignalEscalation.ts diff --git a/apps/server/src/bin.ts b/apps/server/src/bin.ts index ddfbf5e3ecc..9aaf41893c1 100644 --- a/apps/server/src/bin.ts +++ b/apps/server/src/bin.ts @@ -13,6 +13,7 @@ import { hasCloudPublicConfig } from "./cloud/publicConfig.ts"; import { sharedServerCommandFlags } from "./cli/config.ts"; import { projectCommand } from "./cli/project.ts"; import { runServerCommand, serveCommand, startCommand } from "./cli/server.ts"; +import { installNodeShutdownSignalEscalation } from "./cli/shutdownSignalEscalation.ts"; const CliRuntimeLayer = Layer.mergeAll(NodeServices.layer, NetService.layer); @@ -54,9 +55,12 @@ export const makeCli = ({ cloudEnabled = hasCloudPublicConfig } = {}) => export const cli = makeCli(); if (import.meta.main) { + const removeShutdownSignalEscalation = installNodeShutdownSignalEscalation(); + Command.run(cli, { version: packageJson.version }).pipe( Effect.scoped, Effect.provide(CliRuntimeLayer), + Effect.ensuring(Effect.sync(removeShutdownSignalEscalation)), NodeRuntime.runMain, ); } diff --git a/apps/server/src/cli/shutdownSignalEscalation.test.ts b/apps/server/src/cli/shutdownSignalEscalation.test.ts new file mode 100644 index 00000000000..fcb2797bf4b --- /dev/null +++ b/apps/server/src/cli/shutdownSignalEscalation.test.ts @@ -0,0 +1,111 @@ +import { describe, expect, it } from "vite-plus/test"; + +import { + installShutdownSignalEscalation, + type ShutdownSignal, + type ShutdownSignalEscalationOptions, +} from "./shutdownSignalEscalation.ts"; + +const makeHarness = () => { + const listeners: Record void>> = { + SIGINT: new Set(), + SIGTERM: new Set(), + }; + const exitCodes: Array = []; + const timers = new Map void; readonly delayMs: number }>(); + const clearedTimers: Array = []; + let nextTimerHandle = 1; + + const options = { + process: { + addSignalListener: (signal, listener) => { + listeners[signal].add(listener); + }, + removeSignalListener: (signal, listener) => { + listeners[signal].delete(listener); + }, + exit: (code) => { + exitCodes.push(code); + }, + }, + timers: { + setTimeout: (callback, delayMs) => { + const handle = nextTimerHandle; + nextTimerHandle += 1; + timers.set(handle, { callback, delayMs }); + return handle; + }, + clearTimeout: (handle) => { + clearedTimers.push(handle); + timers.delete(handle); + }, + }, + } satisfies ShutdownSignalEscalationOptions; + + const emit = (signal: ShutdownSignal) => { + for (const listener of listeners[signal]) listener(); + }; + + const runNextTimer = () => { + const next = timers.entries().next(); + if (next.done) throw new Error("Expected a pending timer"); + const [handle, timer] = next.value; + timers.delete(handle); + timer.callback(); + }; + + return { clearedTimers, emit, exitCodes, listeners, options, runNextTimer, timers }; +}; + +describe("installShutdownSignalEscalation", () => { + it.each([ + ["SIGINT", 130], + ["SIGTERM", 143], + ] as const)("gives the first %s a bounded graceful-shutdown window", (signal, exitCode) => { + const harness = makeHarness(); + const dispose = installShutdownSignalEscalation({ + ...harness.options, + forceExitAfterMs: 2_500, + }); + + harness.emit(signal); + + expect(harness.exitCodes).toEqual([]); + expect([...harness.timers.values()].map((timer) => timer.delayMs)).toEqual([2_500]); + + harness.runNextTimer(); + + expect(harness.exitCodes).toEqual([exitCode]); + dispose(); + }); + + it("uses the second signal's conventional exit code immediately", () => { + const harness = makeHarness(); + const dispose = installShutdownSignalEscalation(harness.options); + + harness.emit("SIGTERM"); + harness.emit("SIGINT"); + + expect(harness.exitCodes).toEqual([130]); + expect(harness.clearedTimers).toEqual([1]); + expect(harness.timers.size).toBe(0); + dispose(); + }); + + it("removes both listeners and cancels the deadline when disposed", () => { + const harness = makeHarness(); + const dispose = installShutdownSignalEscalation(harness.options); + + harness.emit("SIGINT"); + dispose(); + dispose(); + + expect(harness.listeners.SIGINT.size).toBe(0); + expect(harness.listeners.SIGTERM.size).toBe(0); + expect(harness.clearedTimers).toEqual([1]); + expect(harness.timers.size).toBe(0); + + harness.emit("SIGINT"); + expect(harness.exitCodes).toEqual([]); + }); +}); diff --git a/apps/server/src/cli/shutdownSignalEscalation.ts b/apps/server/src/cli/shutdownSignalEscalation.ts new file mode 100644 index 00000000000..54abfd2931a --- /dev/null +++ b/apps/server/src/cli/shutdownSignalEscalation.ts @@ -0,0 +1,94 @@ +// @effect-diagnostics nodeBuiltinImport:off - This module is the CLI process and signal boundary. +import * as NodeProcess from "node:process"; +import * as NodeTimers from "node:timers"; + +export type ShutdownSignal = "SIGINT" | "SIGTERM"; + +export interface ShutdownSignalProcessHooks { + readonly addSignalListener: (signal: ShutdownSignal, listener: () => void) => void; + readonly removeSignalListener: (signal: ShutdownSignal, listener: () => void) => void; + readonly exit: (code: number) => void; +} + +export interface ShutdownSignalTimerHooks { + readonly setTimeout: (callback: () => void, delayMs: number) => TimerHandle; + readonly clearTimeout: (handle: TimerHandle) => void; +} + +export interface ShutdownSignalEscalationOptions { + readonly process: ShutdownSignalProcessHooks; + readonly timers: ShutdownSignalTimerHooks; + readonly forceExitAfterMs?: number; +} + +const defaultForceExitAfterMs = 10_000; + +const conventionalExitCode = (signal: ShutdownSignal) => (signal === "SIGINT" ? 130 : 143); + +export const installShutdownSignalEscalation = ({ + process: processHooks, + timers, + forceExitAfterMs = defaultForceExitAfterMs, +}: ShutdownSignalEscalationOptions) => { + let firstSignal: ShutdownSignal | undefined; + let forceExitTimer: { readonly handle: TimerHandle } | undefined; + let disposed = false; + + const clearForceExitTimer = () => { + if (forceExitTimer === undefined) return; + timers.clearTimeout(forceExitTimer.handle); + forceExitTimer = undefined; + }; + + const forceExit = (signal: ShutdownSignal) => { + clearForceExitTimer(); + processHooks.exit(conventionalExitCode(signal)); + }; + + const handleSignal = (signal: ShutdownSignal) => { + if (firstSignal !== undefined) { + forceExit(signal); + return; + } + + firstSignal = signal; + forceExitTimer = { + handle: timers.setTimeout(() => { + forceExitTimer = undefined; + processHooks.exit(conventionalExitCode(signal)); + }, forceExitAfterMs), + }; + }; + + const handleSigint = () => handleSignal("SIGINT"); + const handleSigterm = () => handleSignal("SIGTERM"); + + processHooks.addSignalListener("SIGINT", handleSigint); + processHooks.addSignalListener("SIGTERM", handleSigterm); + + return () => { + if (disposed) return; + disposed = true; + processHooks.removeSignalListener("SIGINT", handleSigint); + processHooks.removeSignalListener("SIGTERM", handleSigterm); + clearForceExitTimer(); + }; +}; + +export const installNodeShutdownSignalEscalation = () => + installShutdownSignalEscalation({ + process: { + addSignalListener: (signal, listener) => { + NodeProcess.on(signal, listener); + }, + removeSignalListener: (signal, listener) => { + NodeProcess.removeListener(signal, listener); + }, + exit: (code) => NodeProcess.exit(code), + }, + timers: { + // @effect-diagnostics-next-line globalTimers:off - The forced exit deadline must outlive a stuck Effect shutdown. + setTimeout: (callback, delayMs) => NodeTimers.setTimeout(callback, delayMs), + clearTimeout: (handle) => NodeTimers.clearTimeout(handle), + }, + }); From 39b9fab7a5e4a0df5bed188c59db313fde7c08ae Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:09:53 -0700 Subject: [PATCH 03/15] fix(server): bound provider event log churn --- .../provider/Layers/EventNdjsonLogger.test.ts | 228 +++++- .../src/provider/Layers/EventNdjsonLogger.ts | 712 +++++++++++++----- .../provider/Layers/ProviderEventLoggers.ts | 50 +- 3 files changed, 762 insertions(+), 228 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts index 71ac7831ed4..6c2bb1f1cd3 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts @@ -5,13 +5,25 @@ import * as NodePath from "node:path"; import { ThreadId } from "@t3tools/contracts"; import { assert, describe, it } from "@effect/vitest"; +import * as Clock from "effect/Clock"; import * as Effect from "effect/Effect"; import * as Logger from "effect/Logger"; import * as Schema from "effect/Schema"; +import * as TestClock from "effect/testing/TestClock"; -import { makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; +import { makeEventNdjsonLogger, makeEventNdjsonLogStore } from "./EventNdjsonLogger.ts"; const encodeUnknownJson = Schema.encodeUnknownSync(Schema.UnknownFromJsonString); +const decodeOmittedRecord = Schema.decodeUnknownSync( + Schema.fromJsonString( + Schema.Struct({ + _tag: Schema.String, + reason: Schema.String, + originalBytes: Schema.Number, + eventType: Schema.NullOr(Schema.String), + }), + ), +); function parseLogLine(line: string) { const match = /^\[([^\]]+)\] ([A-Z]+): (.+)$/.exec(line); @@ -55,6 +67,7 @@ describe("EventNdjsonLogger", () => { assert.exists(logger); if (!logger) return; yield* logger.write(circular, ThreadId.make("thread-1")); + yield* logger.close(); const serialized = encodeUnknownJson(messages); assert.notInclude(serialized, secret); @@ -190,25 +203,99 @@ describe("EventNdjsonLogger", () => { }), ); + it.effect("shares one thread writer across native and canonical streams", () => + Effect.gen(function* () { + const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); + const basePath = NodePath.join(tempDir, "events.log"); + + try { + const store = yield* makeEventNdjsonLogStore(basePath, { batchWindowMs: 0 }); + const native = store.logger("native"); + const canonical = store.logger("canonical"); + const threadId = ThreadId.make("thread-shared"); + + yield* native.write({ id: "native-event" }, threadId); + yield* canonical.write({ type: "item.completed", id: "canonical-event" }, threadId); + yield* store.close(); + + const lines = NodeFS.readFileSync(NodePath.join(tempDir, "thread-shared.log"), "utf8") + .trim() + .split("\n") + .map(parseLogLine); + + assert.deepEqual( + lines.map(({ stream, payload }) => ({ stream, payload })), + [ + { stream: "NTIVE", payload: '{"id":"native-event"}' }, + { + stream: "CANON", + payload: '{"type":"item.completed","id":"canonical-event"}', + }, + ], + ); + } finally { + NodeFS.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + + it.effect("drops transient canonical events before serialization", () => + Effect.gen(function* () { + const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); + const basePath = NodePath.join(tempDir, "events.log"); + + try { + const store = yield* makeEventNdjsonLogStore(basePath, { batchWindowMs: 0 }); + const canonical = store.logger("canonical"); + const native = store.logger("native"); + const threadId = ThreadId.make("thread-filtered"); + const circularDelta: Record = { type: "content.delta" }; + circularDelta["self"] = circularDelta; + + yield* canonical.write(circularDelta, threadId); + yield* canonical.write({ type: "item.completed", id: "final" }, threadId); + yield* native.write({ type: "content.delta", id: "native-delta" }, threadId); + yield* store.close(); + + const lines = NodeFS.readFileSync(NodePath.join(tempDir, "thread-filtered.log"), "utf8") + .trim() + .split("\n") + .map(parseLogLine); + + assert.deepEqual( + lines.map(({ stream, payload }) => ({ stream, payload })), + [ + { stream: "CANON", payload: '{"type":"item.completed","id":"final"}' }, + { stream: "NTIVE", payload: '{"type":"content.delta","id":"native-delta"}' }, + ], + ); + } finally { + NodeFS.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + it.effect("rotates per-thread files when max size is exceeded", () => Effect.gen(function* () { const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); const basePath = NodePath.join(tempDir, "provider-native.ndjson"); try { - const logger = yield* makeEventNdjsonLogger(basePath, { - stream: "native", - maxBytes: 120, + const store = yield* makeEventNdjsonLogStore(basePath, { + maxBytes: 512, maxFiles: 2, + batchWindowMs: 0, + maxBufferedBytes: 256, + maxRecordBytes: 256, }); - assert.notEqual(logger, undefined); - if (!logger) { - return; - } + const native = store.logger("native"); + const canonical = store.logger("canonical"); for (let index = 0; index < 10; index += 1) { + const logger = index % 2 === 0 ? native : canonical; yield* logger.write( { + type: "session.started", threadId: "provider-thread-rotate", id: `evt-${index}`, payload: "x".repeat(40), @@ -216,7 +303,7 @@ describe("EventNdjsonLogger", () => { ThreadId.make("thread-rotate"), ); } - yield* logger.close(); + yield* store.close(); const fileStem = "thread-rotate.log"; const matchingFiles = NodeFS.readdirSync(tempDir) @@ -235,6 +322,129 @@ describe("EventNdjsonLogger", () => { matchingFiles.some((entry) => entry === `${fileStem}.3`), false, ); + for (const entry of matchingFiles) { + assert.isAtMost(NodeFS.statSync(NodePath.join(tempDir, entry)).size, 512); + } + } finally { + NodeFS.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + + it.effect("replaces oversized records with a bounded omission marker", () => + Effect.gen(function* () { + const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); + const basePath = NodePath.join(tempDir, "events.log"); + + try { + const store = yield* makeEventNdjsonLogStore(basePath, { + maxBytes: 512, + maxRecordBytes: 256, + maxBufferedBytes: 256, + batchWindowMs: 0, + }); + yield* store + .logger("canonical") + .write( + { type: "item.completed", payload: "x".repeat(2_000) }, + ThreadId.make("thread-oversized"), + ); + yield* store + .logger("canonical") + .write( + { type: "x".repeat(1_000), payload: "x".repeat(2_000) }, + ThreadId.make("thread-oversized-type"), + ); + yield* store.close(); + + const filePath = NodePath.join(tempDir, "thread-oversized.log"); + assert.isAtMost(NodeFS.statSync(filePath).size, 256); + const line = parseLogLine(NodeFS.readFileSync(filePath, "utf8").trim()); + const payload = decodeOmittedRecord(line.payload); + assert.equal(payload._tag, "ProviderEventLogRecordOmitted"); + assert.equal(payload.eventType, "item.completed"); + assert.isAbove(payload.originalBytes, 2_000); + + const longTypeFilePath = NodePath.join(tempDir, "thread-oversized-type.log"); + assert.isAtMost(NodeFS.statSync(longTypeFilePath).size, 256); + const longTypeLine = parseLogLine(NodeFS.readFileSync(longTypeFilePath, "utf8").trim()); + const longTypePayload = decodeOmittedRecord(longTypeLine.payload); + assert.equal(longTypePayload.eventType, null); + } finally { + NodeFS.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + + it.effect("enforces aggregate age and byte retention on startup", () => + Effect.gen(function* () { + const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); + const basePath = NodePath.join(tempDir, "events.log"); + const expiredPath = NodePath.join(tempDir, "expired.log"); + const oldPath = NodePath.join(tempDir, "old.log"); + const newPath = NodePath.join(tempDir, "new.log"); + const ignoredPath = NodePath.join(tempDir, "ignored.txt"); + + try { + yield* TestClock.setTime(1_800_000_000_000); + const now = yield* Clock.currentTimeMillis; + for (const filePath of [expiredPath, oldPath, newPath, ignoredPath]) { + NodeFS.writeFileSync(filePath, "x".repeat(400)); + } + NodeFS.utimesSync(expiredPath, (now - 20_000) / 1_000, (now - 20_000) / 1_000); + NodeFS.utimesSync(oldPath, (now - 5_000) / 1_000, (now - 5_000) / 1_000); + NodeFS.utimesSync(newPath, now / 1_000, now / 1_000); + + const store = yield* makeEventNdjsonLogStore(basePath, { + maxAgeMs: 10_000, + maxTotalBytes: 600, + }); + yield* store.close(); + + assert.equal(NodeFS.existsSync(expiredPath), false); + assert.equal(NodeFS.existsSync(oldPath), false); + assert.equal(NodeFS.existsSync(newPath), true); + assert.equal(NodeFS.existsSync(ignoredPath), true); + } finally { + NodeFS.rmSync(tempDir, { recursive: true, force: true }); + } + }), + ); + + it.effect("keeps aggregate provider logs within the byte budget while writing", () => + Effect.gen(function* () { + const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); + const basePath = NodePath.join(tempDir, "events.log"); + + try { + const store = yield* makeEventNdjsonLogStore(basePath, { + maxBytes: 512, + maxFiles: 2, + maxTotalBytes: 1_200, + maxBufferedBytes: 256, + maxRecordBytes: 256, + batchWindowMs: 0, + }); + + for (let index = 0; index < 30; index += 1) { + yield* store.logger(index % 2 === 0 ? "native" : "canonical").write( + { + type: "item.completed", + id: `evt-${index}`, + payload: "x".repeat(80), + }, + ThreadId.make(`thread-${index % 4}`), + ); + } + yield* store.close(); + + const totalBytes = NodeFS.readdirSync(tempDir, { withFileTypes: true }) + .filter((entry) => entry.isFile() && /\.log(?:\.\d+)?$/u.test(entry.name)) + .reduce( + (total, entry) => total + NodeFS.statSync(NodePath.join(tempDir, entry.name)).size, + 0, + ); + assert.isAtMost(totalBytes, 1_200); } finally { NodeFS.rmSync(tempDir, { recursive: true, force: true }); } diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 8c20a4c1936..ab950340eb3 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -1,10 +1,9 @@ // @effect-diagnostics nodeBuiltinImport:off /** - * Provider event logger helper. + * Best-effort provider event logging with one shared writer store. * - * Best-effort writer for observability logs. Each record is formatted as a - * single effect-style text line in a thread-scoped file. Failures are - * downgraded to warnings so provider runtime behavior is unaffected. + * Native and canonical views share batching, rotation, and retention state so + * they cannot race while appending to the same thread-scoped file. */ import * as NodeFS from "node:fs"; import * as NodePath from "node:path"; @@ -12,275 +11,592 @@ import * as NodePath from "node:path"; import type { ThreadId } from "@t3tools/contracts"; import { RotatingFileSink } from "@t3tools/shared/logging"; import { errorTag } from "@t3tools/shared/observability"; +import * as Clock from "effect/Clock"; +import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; import * as Exit from "effect/Exit"; -import * as Logger from "effect/Logger"; import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; import * as SynchronizedRef from "effect/SynchronizedRef"; import { toSafeThreadAttachmentSegment } from "../../attachmentStore.ts"; -const DEFAULT_MAX_BYTES = 10 * 1024 * 1024; +const MEBIBYTE = 1024 * 1024; +const DAY_MS = 24 * 60 * 60 * 1_000; +const MIN_RECORD_BYTES = 256; +const DEFAULT_MAX_BYTES = 10 * MEBIBYTE; const DEFAULT_MAX_FILES = 10; -const DEFAULT_BATCH_WINDOW_MS = 200; +const DEFAULT_BATCH_WINDOW_MS = 1_000; +const DEFAULT_MAX_TOTAL_BYTES = 512 * MEBIBYTE; +const DEFAULT_MAX_AGE_MS = 14 * DAY_MS; +const DEFAULT_RETENTION_CHECK_INTERVAL_MS = 5 * 60 * 1_000; +const DEFAULT_MAX_BUFFERED_BYTES = MEBIBYTE; +const DEFAULT_MAX_BUFFERED_RECORDS = 512; +const DEFAULT_MAX_RECORD_BYTES = MEBIBYTE; +const DEFAULT_MAX_CACHED_SINKS = 64; const GLOBAL_THREAD_SEGMENT = "_global"; const LOG_SCOPE = "provider-observability"; +const PROVIDER_LOG_FILE_PATTERN = /\.log(?:\.\d+)?$/u; const encodeUnknownJsonString = Schema.encodeUnknownEffect(Schema.UnknownFromJsonString); +const transientCanonicalEventTypes = new Set([ + "content.delta", + "hook.progress", + "item.updated", + "task.progress", + "thread.realtime.audio.delta", + "tool.progress", + "turn.proposed.delta", +]); + export type EventNdjsonStream = "native" | "canonical" | "orchestration"; export interface EventNdjsonLogger { readonly filePath: string; - write: (event: unknown, threadId: ThreadId | null) => Effect.Effect; - close: () => Effect.Effect; + readonly write: (event: unknown, threadId: ThreadId | null) => Effect.Effect; + readonly close: () => Effect.Effect; } -export interface EventNdjsonLoggerOptions { - readonly stream: EventNdjsonStream; +export interface EventNdjsonLogStore { + readonly filePath: string; + readonly logger: (stream: EventNdjsonStream) => EventNdjsonLogger; + readonly close: () => Effect.Effect; +} + +export interface EventNdjsonLogStoreOptions { readonly maxBytes?: number; readonly maxFiles?: number; readonly batchWindowMs?: number; + readonly maxTotalBytes?: number; + readonly maxAgeMs?: number; + readonly retentionCheckIntervalMs?: number; + readonly maxBufferedBytes?: number; + readonly maxBufferedRecords?: number; + readonly maxRecordBytes?: number; + readonly maxCachedSinks?: number; +} + +export interface EventNdjsonLoggerOptions extends EventNdjsonLogStoreOptions { + readonly stream: EventNdjsonStream; +} + +export class EventNdjsonLogConfigurationError extends Schema.TaggedErrorClass()( + "EventNdjsonLogConfigurationError", + { + filePath: Schema.String, + option: Schema.String, + value: Schema.Number, + minimum: Schema.Number, + }, +) { + override get message(): string { + return `Provider event log option '${this.option}' must be an integer >= ${this.minimum}; received ${this.value} for '${this.filePath}'`; + } +} + +export class EventNdjsonLogDirectoryError extends Schema.TaggedErrorClass()( + "EventNdjsonLogDirectoryError", + { + directory: Schema.String, + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to create provider event log directory '${this.directory}'`; + } } -interface ThreadWriter { - writeMessage: (message: string) => Effect.Effect; - close: () => Effect.Effect; +export type EventNdjsonLogStoreError = + | EventNdjsonLogConfigurationError + | EventNdjsonLogDirectoryError; + +interface ResolvedOptions { + readonly maxBytes: number; + readonly maxFiles: number; + readonly batchWindowMs: number; + readonly maxTotalBytes: number; + readonly maxAgeMs: number; + readonly retentionCheckIntervalMs: number; + readonly maxBufferedBytes: number; + readonly maxBufferedRecords: number; + readonly maxRecordBytes: number; + readonly maxCachedSinks: number; } -interface LoggerState { - readonly threadWriters: Map; - readonly failedSegments: Set; +interface PendingRecord { + readonly threadSegment: string; + readonly line: string; + readonly bytes: number; } -function logWarning(message: string, context: Record): Effect.Effect { +interface StoreState { + readonly pending: ReadonlyArray; + readonly pendingBytes: number; + readonly sinks: ReadonlyMap; + readonly closed: boolean; + readonly lastRetentionAt: number; + readonly retainedBytesEstimate: number; +} + +interface FileOperationFailure { + readonly filePath: string; + readonly cause: unknown; +} + +interface RetentionResult { + readonly failures: ReadonlyArray; + readonly removedCount: number; + readonly totalBytes: number; +} + +interface DrainResult { + readonly failures: ReadonlyArray; +} + +function logWarning(message: string, context: Record) { return Effect.logWarning(message, context).pipe(Effect.annotateLogs({ scope: LOG_SCOPE })); } -function resolveThreadSegment(raw: string | null | undefined): string { +function resolveThreadSegment(raw: string | null | undefined) { const normalized = typeof raw === "string" ? toSafeThreadAttachmentSegment(raw) : null; return normalized ?? GLOBAL_THREAD_SEGMENT; } -function formatLoggerMessage(message: unknown): string { - if (Array.isArray(message)) { - return message.map((part) => (typeof part === "string" ? part : String(part))).join(" "); +function resolveStreamLabel(stream: EventNdjsonStream) { + return stream === "native" ? "NTIVE" : "CANON"; +} + +function shouldPersist(stream: EventNdjsonStream, event: unknown) { + if (stream !== "canonical" || typeof event !== "object" || event === null) { + return true; } - return typeof message === "string" ? message : String(message); + const type = Reflect.get(event, "type"); + return typeof type !== "string" || !transientCanonicalEventTypes.has(type); } -function makeLineLogger(streamLabel: string): Logger.Logger { - return Logger.make( - ({ date, message }) => - `[${date.toISOString()}] ${streamLabel}: ${formatLoggerMessage(message)}\n`, - ); +function eventType(event: unknown) { + if (typeof event !== "object" || event === null) return null; + const type = Reflect.get(event, "type"); + return typeof type === "string" ? type.slice(0, 128) : null; } -function resolveStreamLabel(stream: EventNdjsonStream): string { - switch (stream) { - case "native": - return "NTIVE"; - case "canonical": - case "orchestration": - default: - return "CANON"; +function formatRecordLine(input: { + readonly stream: EventNdjsonStream; + readonly event: unknown; + readonly observedAt: string; + readonly payload: string; + readonly maxRecordBytes: number; +}) { + const prefix = `[${input.observedAt}] ${resolveStreamLabel(input.stream)}: `; + const line = `${prefix}${input.payload}\n`; + const bytes = Buffer.byteLength(line); + if (bytes <= input.maxRecordBytes) { + return { line, bytes }; + } + + const omittedPayload = (type: string | null) => + JSON.stringify({ + _tag: "ProviderEventLogRecordOmitted", + reason: "record_too_large", + originalBytes: bytes, + eventType: type, + }); + let omittedLine = `${prefix}${omittedPayload(eventType(input.event))}\n`; + if (Buffer.byteLength(omittedLine) > input.maxRecordBytes) { + omittedLine = `${prefix}${omittedPayload(null)}\n`; } + return { + line: omittedLine, + bytes: Buffer.byteLength(omittedLine), + }; } -const toLogMessage = Effect.fn("toLogMessage")(function* ( - event: unknown, -): Effect.fn.Return { - return yield* encodeUnknownJsonString(event).pipe( - Effect.catch((error) => - logWarning("failed to serialize provider event log record", { - errorTag: errorTag(error), - }).pipe(Effect.as(undefined)), - ), - ); -}); +function writeBatchedRecords( + sink: RotatingFileSink, + records: ReadonlyArray, + maxChunkBytes: number, +) { + let pendingLines: Array = []; + let pendingBytes = 0; + + const flush = () => { + if (pendingLines.length === 0) return; + sink.write(pendingLines.join("")); + pendingLines = []; + pendingBytes = 0; + }; + + for (const record of records) { + if (pendingBytes > 0 && pendingBytes + record.bytes > maxChunkBytes) { + flush(); + } + pendingLines.push(record.line); + pendingBytes += record.bytes; + if (pendingBytes >= maxChunkBytes) { + flush(); + } + } + flush(); +} -const makeThreadWriter = Effect.fn("makeThreadWriter")(function* (input: { - readonly filePath: string; - readonly maxBytes: number; - readonly maxFiles: number; - readonly batchWindowMs: number; - readonly streamLabel: string; -}): Effect.fn.Return { - const sinkResult = yield* Effect.sync(() => { +function enforceRetention(input: { + readonly directory: string; + readonly maxTotalBytes: number; + readonly maxAgeMs: number; + readonly now: number; +}): RetentionResult { + const failures: Array = []; + const files: Array<{ filePath: string; mtimeMs: number; size: number }> = []; + + let entries: ReadonlyArray; + try { + entries = NodeFS.readdirSync(input.directory, { withFileTypes: true }); + } catch (cause) { + return { + failures: [{ filePath: input.directory, cause }], + removedCount: 0, + totalBytes: input.maxTotalBytes + 1, + }; + } + + for (const entry of entries) { + if (!entry.isFile() || !PROVIDER_LOG_FILE_PATTERN.test(entry.name)) continue; + const filePath = NodePath.join(input.directory, entry.name); try { - return { - ok: true as const, - sink: new RotatingFileSink({ - filePath: input.filePath, - maxBytes: input.maxBytes, - maxFiles: input.maxFiles, - throwOnError: true, - }), - }; - } catch (error) { - return { ok: false as const, error }; + const stat = NodeFS.statSync(filePath); + files.push({ filePath, mtimeMs: stat.mtimeMs, size: stat.size }); + } catch (cause) { + failures.push({ filePath, cause }); } + } + + let totalBytes = files.reduce((total, file) => total + file.size, 0); + let removedCount = 0; + const remove = (file: (typeof files)[number]) => { + try { + NodeFS.rmSync(file.filePath, { force: true }); + totalBytes -= file.size; + removedCount += 1; + return true; + } catch (cause) { + failures.push({ filePath: file.filePath, cause }); + return false; + } + }; + + const retained = files.filter((file) => { + if (input.now - file.mtimeMs <= input.maxAgeMs) return true; + return !remove(file); }); - if (!sinkResult.ok) { - yield* logWarning("failed to initialize provider thread log file", { - filePath: input.filePath, - errorTag: errorTag(sinkResult.error), - }); - return undefined; + for (const file of retained.toSorted( + (left, right) => left.mtimeMs - right.mtimeMs || left.filePath.localeCompare(right.filePath), + )) { + if (totalBytes <= input.maxTotalBytes) break; + remove(file); } - const sink = sinkResult.sink; - const scope = yield* Scope.make(); - const lineLogger = makeLineLogger(input.streamLabel); - const batchedLogger = yield* Logger.batched(lineLogger, { - window: input.batchWindowMs, - flush: Effect.fn("makeThreadWriter.flush")(function* (messages) { - const flushResult = yield* Effect.sync(() => { - try { - for (const message of messages) { - sink.write(message); - } - return { ok: true as const }; - } catch (error) { - return { ok: false as const, error }; - } - }); + return { failures, removedCount, totalBytes }; +} + +function validateOption(input: { + readonly filePath: string; + readonly option: string; + readonly value: number; + readonly minimum: number; +}) { + if (Number.isInteger(input.value) && input.value >= input.minimum) return undefined; + return new EventNdjsonLogConfigurationError(input); +} + +function resolveOptions(filePath: string, options: EventNdjsonLogStoreOptions) { + const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; + const maxBufferedBytes = options.maxBufferedBytes ?? DEFAULT_MAX_BUFFERED_BYTES; + const requestedMaxRecordBytes = options.maxRecordBytes ?? DEFAULT_MAX_RECORD_BYTES; + const resolved = { + maxBytes, + maxFiles: options.maxFiles ?? DEFAULT_MAX_FILES, + batchWindowMs: options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS, + maxTotalBytes: options.maxTotalBytes ?? DEFAULT_MAX_TOTAL_BYTES, + maxAgeMs: options.maxAgeMs ?? DEFAULT_MAX_AGE_MS, + retentionCheckIntervalMs: + options.retentionCheckIntervalMs ?? DEFAULT_RETENTION_CHECK_INTERVAL_MS, + maxBufferedBytes, + maxBufferedRecords: options.maxBufferedRecords ?? DEFAULT_MAX_BUFFERED_RECORDS, + maxRecordBytes: Math.min(requestedMaxRecordBytes, maxBytes, maxBufferedBytes), + maxCachedSinks: options.maxCachedSinks ?? DEFAULT_MAX_CACHED_SINKS, + } satisfies ResolvedOptions; + + const validations = [ + ["maxBytes", maxBytes, MIN_RECORD_BYTES], + ["maxFiles", resolved.maxFiles, 1], + ["batchWindowMs", resolved.batchWindowMs, 0], + ["maxTotalBytes", resolved.maxTotalBytes, 1], + ["maxAgeMs", resolved.maxAgeMs, 1], + ["retentionCheckIntervalMs", resolved.retentionCheckIntervalMs, 1], + ["maxBufferedBytes", maxBufferedBytes, MIN_RECORD_BYTES], + ["maxBufferedRecords", resolved.maxBufferedRecords, 1], + ["maxRecordBytes", requestedMaxRecordBytes, MIN_RECORD_BYTES], + ["maxCachedSinks", resolved.maxCachedSinks, 1], + ] as const; + + for (const [option, value, minimum] of validations) { + const error = validateOption({ filePath, option, value, minimum }); + if (error) return Effect.fail(error); + } + return Effect.succeed(resolved); +} + +function drainPending(input: { + readonly directory: string; + readonly options: ResolvedOptions; + readonly state: StoreState; + readonly now: number; + readonly close: boolean; +}): readonly [DrainResult, StoreState] { + if (input.state.closed) { + return [{ failures: [] }, input.state]; + } - if (!flushResult.ok) { - yield* logWarning("provider event log batch flush failed", { - filePath: input.filePath, - errorTag: errorTag(flushResult.error), + let sinks = new Map(input.state.sinks); + const failures: Array = []; + const recordsBySegment = new Map>(); + + for (const record of input.state.pending) { + const records = recordsBySegment.get(record.threadSegment) ?? []; + records.push(record); + recordsBySegment.set(record.threadSegment, records); + } + + let writtenBytes = 0; + for (const [threadSegment, records] of recordsBySegment) { + const filePath = NodePath.join(input.directory, `${threadSegment}.log`); + let sink = sinks.get(threadSegment); + try { + if (!sink) { + sink = new RotatingFileSink({ + filePath, + maxBytes: input.options.maxBytes, + maxFiles: input.options.maxFiles, + throwOnError: true, }); } - }), - }).pipe(Effect.provideService(Scope.Scope, scope)); + sinks.delete(threadSegment); + sinks.set(threadSegment, sink); + while (sinks.size > input.options.maxCachedSinks) { + const oldest = sinks.keys().next().value; + if (oldest === undefined) break; + sinks.delete(oldest); + } + writeBatchedRecords( + sink, + records, + Math.min(input.options.maxBytes, input.options.maxBufferedBytes), + ); + writtenBytes += records.reduce((total, record) => total + record.bytes, 0); + } catch (cause) { + sinks.delete(threadSegment); + failures.push({ filePath, cause }); + } + } - const loggerLayer = Logger.layer([batchedLogger], { mergeWithExisting: false }); + const retainedBytesEstimate = input.state.retainedBytesEstimate + writtenBytes; + const retentionDue = + input.close || + failures.length > 0 || + retainedBytesEstimate > input.options.maxTotalBytes || + input.now - input.state.lastRetentionAt >= input.options.retentionCheckIntervalMs; + const retention = retentionDue + ? enforceRetention({ + directory: input.directory, + maxTotalBytes: input.options.maxTotalBytes, + maxAgeMs: input.options.maxAgeMs, + now: input.now, + }) + : null; + if (retention && retention.removedCount > 0) { + sinks = new Map(); + } - return { - writeMessage(message: string) { - return Effect.log(message).pipe(Effect.provide(loggerLayer)); + return [ + { + failures: [...failures, ...(retention?.failures ?? [])], }, - close() { - return Scope.close(scope, Exit.void); + { + pending: [], + pendingBytes: 0, + sinks, + closed: input.close, + lastRetentionAt: retention ? input.now : input.state.lastRetentionAt, + retainedBytesEstimate: retention?.totalBytes ?? retainedBytesEstimate, }, - } satisfies ThreadWriter; + ]; +} + +const serializeEvent = Effect.fnUntraced(function* (event: unknown) { + return yield* encodeUnknownJsonString(event).pipe( + Effect.catch((error) => + logWarning("failed to serialize provider event log record", { + errorTag: errorTag(error), + }).pipe(Effect.as(undefined)), + ), + ); }); -export const makeEventNdjsonLogger = Effect.fn("makeEventNdjsonLogger")(function* ( +export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( filePath: string, - options: EventNdjsonLoggerOptions, -): Effect.fn.Return { - const maxBytes = options.maxBytes ?? DEFAULT_MAX_BYTES; - const maxFiles = options.maxFiles ?? DEFAULT_MAX_FILES; - const batchWindowMs = options.batchWindowMs ?? DEFAULT_BATCH_WINDOW_MS; - const streamLabel = resolveStreamLabel(options.stream); - - const directoryReady = yield* Effect.sync(() => { - try { - NodeFS.mkdirSync(NodePath.dirname(filePath), { recursive: true }); - return true; - } catch (error) { - return { ok: false as const, error }; - } + options: EventNdjsonLogStoreOptions = {}, +): Effect.fn.Return { + const resolved = yield* resolveOptions(filePath, options); + const directory = NodePath.dirname(filePath); + + yield* Effect.try({ + try: () => NodeFS.mkdirSync(directory, { recursive: true }), + catch: (cause) => new EventNdjsonLogDirectoryError({ directory, cause }), }); - if (directoryReady !== true) { - yield* logWarning("failed to create provider event log directory", { - filePath, - errorTag: errorTag(directoryReady.error), + + const initializedAt = yield* Clock.currentTimeMillis; + const initialRetention = yield* Effect.sync(() => + enforceRetention({ + directory, + maxTotalBytes: resolved.maxTotalBytes, + maxAgeMs: resolved.maxAgeMs, + now: initializedAt, + }), + ); + for (const failure of initialRetention.failures) { + yield* logWarning("provider event log retention failed", { + filePath: failure.filePath, + errorTag: errorTag(failure.cause), }); - return undefined; } - const stateRef = yield* SynchronizedRef.make({ - threadWriters: new Map(), - failedSegments: new Set(), + const stateRef = yield* SynchronizedRef.make({ + pending: [], + pendingBytes: 0, + sinks: new Map(), + closed: false, + lastRetentionAt: initializedAt, + retainedBytesEstimate: initialRetention.totalBytes, }); + const timerScope = yield* Scope.make(); - const resolveThreadWriter = Effect.fn("resolveThreadWriter")(function* ( - threadSegment: string, - ): Effect.fn.Return { - return yield* SynchronizedRef.modifyEffect(stateRef, (state) => { - if (state.failedSegments.has(threadSegment)) { - return Effect.succeed([undefined, state] as const); - } - - const existing = state.threadWriters.get(threadSegment); - if (existing) { - return Effect.succeed([existing, state] as const); - } + const reportDrainResult = Effect.fnUntraced(function* (result: DrainResult) { + for (const failure of result.failures) { + yield* logWarning("provider event log write or retention failed", { + filePath: failure.filePath, + errorTag: errorTag(failure.cause), + }); + } + }); - return makeThreadWriter({ - filePath: NodePath.join(NodePath.dirname(filePath), `${threadSegment}.log`), - maxBytes, - maxFiles, - batchWindowMs, - streamLabel, - }).pipe( - Effect.map((writer) => { - if (!writer) { - const nextFailedSegments = new Set(state.failedSegments); - nextFailedSegments.add(threadSegment); - return [ - undefined, - { - ...state, - failedSegments: nextFailedSegments, - }, - ] as const; - } - - const nextThreadWriters = new Map(state.threadWriters); - nextThreadWriters.set(threadSegment, writer); - return [ - writer, - { - ...state, - threadWriters: nextThreadWriters, - }, - ] as const; + const flush = Effect.fnUntraced(function* (close: boolean) { + const now = yield* Clock.currentTimeMillis; + const result = yield* SynchronizedRef.modifyEffect(stateRef, (state) => + Effect.sync(() => + drainPending({ + directory, + options: resolved, + state, + now, + close, }), - ); - }); + ), + ); + yield* reportDrainResult(result); }); - const write = Effect.fn("write")(function* (event: unknown, threadId: ThreadId | null) { - const threadSegment = resolveThreadSegment(threadId); - const message = yield* toLogMessage(event); - if (!message) { - return; - } - - const writer = yield* resolveThreadWriter(threadSegment); - if (!writer) { - return; - } + if (resolved.batchWindowMs > 0) { + yield* Effect.forkIn( + Effect.forever(Effect.sleep(resolved.batchWindowMs).pipe(Effect.andThen(flush(false)))), + timerScope, + { startImmediately: true }, + ); + } - yield* writer.writeMessage(message); + const close = Effect.fnUntraced(function* () { + yield* flush(true); + yield* Scope.close(timerScope, Exit.void); }); - const close = Effect.fn("close")(function* () { - yield* SynchronizedRef.modifyEffect(stateRef, (state) => - Effect.gen(function* () { - for (const writer of state.threadWriters.values()) { - yield* writer.close(); + const loggerViews = new Map(); + const logger = (stream: EventNdjsonStream): EventNdjsonLogger => { + const existing = loggerViews.get(stream); + if (existing) return existing; + + const write = Effect.fnUntraced(function* (event: unknown, threadId: ThreadId | null) { + if (!shouldPersist(stream, event)) return; + const payload = yield* serializeEvent(event); + if (payload === undefined) return; + + const observedAt = yield* DateTime.now.pipe(Effect.map(DateTime.formatIso)); + const record = formatRecordLine({ + stream, + event, + observedAt, + payload, + maxRecordBytes: resolved.maxRecordBytes, + }); + const now = yield* Clock.currentTimeMillis; + const result = yield* SynchronizedRef.modifyEffect(stateRef, (state) => { + if (state.closed) { + return Effect.succeed([null, state] as const); } - - return [ - undefined, + const pending = [ + ...state.pending, { - threadWriters: new Map(), - failedSegments: new Set(), + threadSegment: resolveThreadSegment(threadId), + line: record.line, + bytes: record.bytes, }, - ] as const; - }), - ); - }); + ]; + const pendingBytes = state.pendingBytes + record.bytes; + const shouldFlush = + resolved.batchWindowMs === 0 || + pending.length >= resolved.maxBufferedRecords || + pendingBytes >= resolved.maxBufferedBytes; + const nextState = { + ...state, + pending, + pendingBytes, + }; + if (!shouldFlush) { + return Effect.succeed([null, nextState] as const); + } + return Effect.sync(() => { + const [drainResult, drainedState] = drainPending({ + directory, + options: resolved, + state: nextState, + now, + close: false, + }); + return [drainResult, drainedState] as const; + }); + }); + if (result) { + yield* reportDrainResult(result); + } + }); - return { - filePath, - write, - close, - } satisfies EventNdjsonLogger; + const view = { filePath, write, close } satisfies EventNdjsonLogger; + loggerViews.set(stream, view); + return view; + }; + + return { filePath, logger, close } satisfies EventNdjsonLogStore; +}); + +export const makeEventNdjsonLogger = Effect.fnUntraced(function* ( + filePath: string, + options: EventNdjsonLoggerOptions, +): Effect.fn.Return { + const store = yield* makeEventNdjsonLogStore(filePath, options).pipe( + Effect.catch((error) => + logWarning(error.message, { error }).pipe( + Effect.as(undefined), + ), + ), + ); + return store?.logger(options.stream); }); diff --git a/apps/server/src/provider/Layers/ProviderEventLoggers.ts b/apps/server/src/provider/Layers/ProviderEventLoggers.ts index 711aa6e76b6..2ef9644e2ab 100644 --- a/apps/server/src/provider/Layers/ProviderEventLoggers.ts +++ b/apps/server/src/provider/Layers/ProviderEventLoggers.ts @@ -1,6 +1,6 @@ /** - * ProviderEventLoggers — single observability service that owns the two - * shared NDJSON streams the provider runtime writes: + * ProviderEventLoggers — single observability service that owns the shared + * provider event log store and exposes its two runtime views: * * - `native` — provider-protocol events as the SDK emits them, written * from inside each `Adapter` factory. @@ -13,17 +13,14 @@ * not at the boot Layer. There is no longer a single `makeAdapterLive(options)` * call site where we can hand an `EventNdjsonLogger` in by hand. * - Multiple driver instances per kind (`codex_personal`, `codex_work`) - * should share one underlying log writer per stream — opening N writers - * against the same rotating file would race the rotation logic. Owning - * the loggers on a single tag keeps that invariant intact. + * should share one underlying log store — opening N writers against the + * same rotating file would race the rotation logic. Owning the loggers on + * a single tag keeps that invariant intact. * - Tests can swap one (or both) loggers with in-memory recorders by * `Layer.succeed(ProviderEventLoggers, { native, canonical })` instead of * juggling per-Layer option threading. * - * Both fields are optional. `makeEventNdjsonLogger` returns `undefined` when - * the target directory cannot be created; we forward that as `undefined` - * rather than failing the boot Layer, matching the previous best-effort - * behavior of `server.ts`. + * Both fields are optional because observability must not prevent startup. * * @module provider/Layers/ProviderEventLoggers */ @@ -32,7 +29,11 @@ import * as Effect from "effect/Effect"; import * as Layer from "effect/Layer"; import { ServerConfig } from "../../config.ts"; -import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; +import { + type EventNdjsonLogger, + type EventNdjsonLogStore, + makeEventNdjsonLogStore, +} from "./EventNdjsonLogger.ts"; export interface ProviderEventLoggersShape { readonly native: EventNdjsonLogger | undefined; @@ -63,23 +64,30 @@ export const NoOpProviderEventLoggers: ProviderEventLoggersShape = { }; /** - * Live Layer that builds both loggers from `ServerConfig.providerEventLogPath`. - * If the directory create fails for either stream, the corresponding field - * is `undefined` and writes from that stream become no-ops downstream. + * Live layer that builds both stream views over one shared store. Setup + * failures are logged and downgraded to the no-op service so diagnostics never + * block startup. The layer scope flushes and closes the store during shutdown. */ export const ProviderEventLoggersLive = Layer.effect( ProviderEventLoggers, Effect.gen(function* () { const { providerEventLogPath } = yield* ServerConfig; - const native = yield* makeEventNdjsonLogger(providerEventLogPath, { - stream: "native", - }); - const canonical = yield* makeEventNdjsonLogger(providerEventLogPath, { - stream: "canonical", - }); + const store = yield* makeEventNdjsonLogStore(providerEventLogPath).pipe( + Effect.catch((error) => + Effect.logWarning(error.message, { error }).pipe( + Effect.annotateLogs({ scope: "provider-observability" }), + Effect.as(undefined), + ), + ), + ); + if (!store) { + return NoOpProviderEventLoggers; + } + + yield* Effect.addFinalizer(() => store.close()); return { - native, - canonical, + native: store.logger("native"), + canonical: store.logger("canonical"), } satisfies ProviderEventLoggersShape; }), ); From 6c0533a23b0fc294149860d3b8ef87d79ea51282 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:14:27 -0700 Subject: [PATCH 04/15] fix(server): harden background Git status refresh --- apps/server/src/git/GitWorkflowService.ts | 7 + apps/server/src/vcs/GitVcsDriver.ts | 1 + apps/server/src/vcs/GitVcsDriverCore.test.ts | 211 +++++++++++++++++- apps/server/src/vcs/GitVcsDriverCore.ts | 162 ++++++++++---- .../src/vcs/VcsStatusBroadcaster.test.ts | 96 ++++++++ apps/server/src/vcs/VcsStatusBroadcaster.ts | 12 +- 6 files changed, 448 insertions(+), 41 deletions(-) diff --git a/apps/server/src/git/GitWorkflowService.ts b/apps/server/src/git/GitWorkflowService.ts index 100b9beadba..d0adff88b35 100644 --- a/apps/server/src/git/GitWorkflowService.ts +++ b/apps/server/src/git/GitWorkflowService.ts @@ -45,6 +45,7 @@ export class GitWorkflowService extends Context.Service< input: VcsStatusInput, options?: GitVcsDriver.GitRemoteStatusOptions, ) => Effect.Effect; + readonly refreshStatusUpstream: (cwd: string) => Effect.Effect; readonly invalidateLocalStatus: (cwd: string) => Effect.Effect; readonly invalidateRemoteStatus: (cwd: string) => Effect.Effect; readonly invalidateStatus: (cwd: string) => Effect.Effect; @@ -270,6 +271,12 @@ export const make = Effect.gen(function* () { isGitRepository ? gitManager.remoteStatus(input, options) : Effect.succeed(null), ), ), + refreshStatusUpstream: (cwd) => + detectGitRepositoryForStatus("GitWorkflowService.refreshStatusUpstream", cwd).pipe( + Effect.flatMap((isGitRepository) => + isGitRepository ? git.refreshStatusUpstream(cwd) : Effect.void, + ), + ), invalidateLocalStatus: gitManager.invalidateLocalStatus, invalidateRemoteStatus: gitManager.invalidateRemoteStatus, invalidateStatus: gitManager.invalidateStatus, diff --git a/apps/server/src/vcs/GitVcsDriver.ts b/apps/server/src/vcs/GitVcsDriver.ts index 55aa8f38835..5281775b515 100644 --- a/apps/server/src/vcs/GitVcsDriver.ts +++ b/apps/server/src/vcs/GitVcsDriver.ts @@ -199,6 +199,7 @@ export class GitVcsDriver extends Context.Service< cwd: string, options?: GitRemoteStatusOptions, ) => Effect.Effect; + readonly refreshStatusUpstream: (cwd: string) => Effect.Effect; readonly prepareCommitContext: ( cwd: string, filePaths?: readonly string[], diff --git a/apps/server/src/vcs/GitVcsDriverCore.test.ts b/apps/server/src/vcs/GitVcsDriverCore.test.ts index dc58fc2543c..4ed52be86d7 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.test.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.test.ts @@ -8,6 +8,7 @@ import * as PlatformError from "effect/PlatformError"; import * as Scope from "effect/Scope"; import { GitCommandError } from "@t3tools/contracts"; +import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; import { ServerConfig } from "../config.ts"; import { splitNullSeparatedGitStdoutPaths } from "./GitVcsDriverCore.ts"; import * as GitVcsDriver from "./GitVcsDriver.ts"; @@ -58,6 +59,20 @@ const git = ( return result.stdout.trim(); }); +const findGitExecutable = Effect.fn("GitVcsDriver.test.findGitExecutable")(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const pathService = yield* Path.Path; + const pathDelimiter = (yield* HostProcessPlatform) === "win32" ? ";" : ":"; + for (const entry of (process.env.PATH ?? "").split(pathDelimiter)) { + if (entry.length === 0) continue; + const candidate = pathService.join(entry, "git"); + if (yield* fileSystem.exists(candidate)) { + return yield* fileSystem.realPath(candidate); + } + } + return yield* Effect.die(new Error("Could not find the Git executable in PATH.")); +}); + const initRepoWithCommit = ( cwd: string, ): Effect.Effect< @@ -148,6 +163,7 @@ it.layer(TestLayer)("GitVcsDriver core integration", (it) => { const cwd = pathService.join(parent, "missing"); const driver = yield* GitVcsDriver.GitVcsDriver; + yield* driver.refreshStatusUpstream(cwd); const [localStatus, remoteStatus, refs] = yield* Effect.all([ driver.statusDetails(cwd), driver.statusDetailsRemote(cwd, { refreshUpstream: false }), @@ -435,8 +451,16 @@ it.layer(TestLayer)("GitVcsDriver core integration", (it) => { process.env.SSH_ASKPASS_REQUIRE = "force"; process.env.T3_TEST_SSH_ASKPASS_LOG = sshLogPath; - yield* (yield* GitVcsDriver.GitVcsDriver).statusDetails(cwd); + const refreshError = yield* (yield* GitVcsDriver.GitVcsDriver) + .refreshStatusUpstream(cwd) + .pipe(Effect.flip); + assert.deepInclude(refreshError, { + _tag: "GitCommandError", + operation: "GitVcsDriver.fetchRemoteForStatus", + cwd, + detail: "Background Git fetch failed.", + }); assert.deepEqual((yield* fileSystem.readFileString(sshLogPath)).trim().split(/\r?\n/), [ "GCM_INTERACTIVE=never", "GIT_ASKPASS=", @@ -461,6 +485,191 @@ it.layer(TestLayer)("GitVcsDriver core integration", (it) => { }), ); + it.effect("fetches status remotes from the worktree with hardened background flags", () => + Effect.gen(function* () { + const platform = yield* HostProcessPlatform; + if (platform === "win32") return; + + const cwd = yield* makeTmpDir(); + const remote = yield* makeTmpDir("git-vcs-driver-remote-"); + const wrapperDir = yield* makeTmpDir("git-vcs-driver-wrapper-"); + const { initialBranch } = yield* initRepoWithCommit(cwd); + const fileSystem = yield* FileSystem.FileSystem; + const pathService = yield* Path.Path; + const realGit = yield* findGitExecutable(); + const wrapperPath = pathService.join(wrapperDir, "git"); + const invocationLogPath = pathService.join(wrapperDir, "fetch-invocation.txt"); + const fetchHeadPath = pathService.join(cwd, ".git", "FETCH_HEAD"); + const previousPath = process.env.PATH; + const previousRealGit = process.env.T3_TEST_REAL_GIT; + const previousInvocationLog = process.env.T3_TEST_FETCH_INVOCATION_LOG; + + yield* git(remote, ["init", "--bare"]); + yield* git(cwd, ["remote", "add", "origin", remote]); + yield* git(cwd, ["push", "-u", "origin", initialBranch]); + yield* fileSystem.writeFileString(fetchHeadPath, "sentinel-fetch-head\n"); + yield* fileSystem.writeFileString( + wrapperPath, + [ + "#!/bin/sh", + "is_fetch=0", + 'for arg in "$@"; do', + ' if [ "$arg" = "fetch" ]; then is_fetch=1; fi', + "done", + 'if [ "$is_fetch" = "1" ]; then', + ' printf "cwd=%s\\n" "$PWD" > "$T3_TEST_FETCH_INVOCATION_LOG"', + ' for arg in "$@"; do', + ' printf "arg=%s\\n" "$arg" >> "$T3_TEST_FETCH_INVOCATION_LOG"', + " done", + "fi", + 'exec "$T3_TEST_REAL_GIT" "$@"', + "", + ].join("\n"), + ); + yield* fileSystem.chmod(wrapperPath, 0o755); + + yield* Effect.gen(function* () { + process.env.PATH = `${wrapperDir}:${previousPath ?? ""}`; + process.env.T3_TEST_REAL_GIT = realGit; + process.env.T3_TEST_FETCH_INVOCATION_LOG = invocationLogPath; + + yield* (yield* GitVcsDriver.GitVcsDriver).refreshStatusUpstream(cwd); + + assert.deepStrictEqual( + (yield* fileSystem.readFileString(invocationLogPath)).trim().split(/\r?\n/), + [ + `cwd=${cwd}`, + "arg=fetch", + "arg=--quiet", + "arg=--no-tags", + "arg=--no-write-fetch-head", + "arg=--no-auto-maintenance", + "arg=--no-recurse-submodules", + "arg=--", + "arg=origin", + ], + ); + assert.equal(yield* fileSystem.readFileString(fetchHeadPath), "sentinel-fetch-head\n"); + }).pipe( + Effect.ensuring( + Effect.sync(() => { + if (previousPath === undefined) delete process.env.PATH; + else process.env.PATH = previousPath; + if (previousRealGit === undefined) delete process.env.T3_TEST_REAL_GIT; + else process.env.T3_TEST_REAL_GIT = previousRealGit; + if (previousInvocationLog === undefined) { + delete process.env.T3_TEST_FETCH_INVOCATION_LOG; + } else { + process.env.T3_TEST_FETCH_INVOCATION_LOG = previousInvocationLog; + } + }), + ), + ); + }), + ); + + it.effect("coalesces matching refreshes and serializes different remotes sharing objects", () => + Effect.gen(function* () { + const platform = yield* HostProcessPlatform; + if (platform === "win32") return; + + const cwd = yield* makeTmpDir(); + const origin = yield* makeTmpDir("git-vcs-driver-origin-"); + const backup = yield* makeTmpDir("git-vcs-driver-backup-"); + const worktreeParent = yield* makeTmpDir("git-vcs-driver-worktrees-"); + const wrapperDir = yield* makeTmpDir("git-vcs-driver-wrapper-"); + const { initialBranch } = yield* initRepoWithCommit(cwd); + const fileSystem = yield* FileSystem.FileSystem; + const pathService = yield* Path.Path; + const worktreeCwd = pathService.join(worktreeParent, "feature"); + const realGit = yield* findGitExecutable(); + const wrapperPath = pathService.join(wrapperDir, "git"); + const invocationLogPath = pathService.join(wrapperDir, "fetches.txt"); + const activeLockPath = pathService.join(wrapperDir, "active-fetch"); + const overlapLogPath = pathService.join(wrapperDir, "overlap.txt"); + const previousPath = process.env.PATH; + const previousRealGit = process.env.T3_TEST_REAL_GIT; + const previousInvocationLog = process.env.T3_TEST_FETCH_INVOCATION_LOG; + const previousActiveLock = process.env.T3_TEST_FETCH_ACTIVE_LOCK; + const previousOverlapLog = process.env.T3_TEST_FETCH_OVERLAP_LOG; + + yield* git(origin, ["init", "--bare"]); + yield* git(backup, ["init", "--bare"]); + yield* git(cwd, ["remote", "add", "origin", origin]); + yield* git(cwd, ["remote", "add", "backup", backup]); + yield* git(cwd, ["push", "-u", "origin", initialBranch]); + yield* git(cwd, ["worktree", "add", "-b", "feature/serialized", worktreeCwd]); + yield* git(worktreeCwd, ["push", "-u", "backup", "feature/serialized"]); + yield* fileSystem.writeFileString( + wrapperPath, + [ + "#!/bin/sh", + "is_fetch=0", + 'for arg in "$@"; do', + ' if [ "$arg" = "fetch" ]; then is_fetch=1; fi', + "done", + 'if [ "$is_fetch" = "1" ]; then', + ' printf "start\\n" >> "$T3_TEST_FETCH_INVOCATION_LOG"', + ' if ! mkdir "$T3_TEST_FETCH_ACTIVE_LOCK" 2>/dev/null; then', + ' printf "overlap\\n" >> "$T3_TEST_FETCH_OVERLAP_LOG"', + " fi", + " sleep 0.2", + ' "$T3_TEST_REAL_GIT" "$@"', + " status=$?", + ' rmdir "$T3_TEST_FETCH_ACTIVE_LOCK" 2>/dev/null || true', + ' printf "end\\n" >> "$T3_TEST_FETCH_INVOCATION_LOG"', + " exit $status", + "fi", + 'exec "$T3_TEST_REAL_GIT" "$@"', + "", + ].join("\n"), + ); + yield* fileSystem.chmod(wrapperPath, 0o755); + + yield* Effect.gen(function* () { + process.env.PATH = `${wrapperDir}:${previousPath ?? ""}`; + process.env.T3_TEST_REAL_GIT = realGit; + process.env.T3_TEST_FETCH_INVOCATION_LOG = invocationLogPath; + process.env.T3_TEST_FETCH_ACTIVE_LOCK = activeLockPath; + process.env.T3_TEST_FETCH_OVERLAP_LOG = overlapLogPath; + + const driver = yield* GitVcsDriver.GitVcsDriver; + yield* Effect.all( + [ + driver.refreshStatusUpstream(cwd), + driver.refreshStatusUpstream(cwd), + driver.refreshStatusUpstream(worktreeCwd), + ], + { concurrency: "unbounded", discard: true }, + ); + + const invocationLines = (yield* fileSystem.readFileString(invocationLogPath)) + .trim() + .split(/\r?\n/); + assert.deepStrictEqual(invocationLines, ["start", "end", "start", "end"]); + assert.equal(yield* fileSystem.exists(overlapLogPath), false); + }).pipe( + Effect.ensuring( + Effect.sync(() => { + if (previousPath === undefined) delete process.env.PATH; + else process.env.PATH = previousPath; + if (previousRealGit === undefined) delete process.env.T3_TEST_REAL_GIT; + else process.env.T3_TEST_REAL_GIT = previousRealGit; + if (previousInvocationLog === undefined) { + delete process.env.T3_TEST_FETCH_INVOCATION_LOG; + } else { + process.env.T3_TEST_FETCH_INVOCATION_LOG = previousInvocationLog; + } + if (previousActiveLock === undefined) delete process.env.T3_TEST_FETCH_ACTIVE_LOCK; + else process.env.T3_TEST_FETCH_ACTIVE_LOCK = previousActiveLock; + if (previousOverlapLog === undefined) delete process.env.T3_TEST_FETCH_OVERLAP_LOG; + else process.env.T3_TEST_FETCH_OVERLAP_LOG = previousOverlapLog; + }), + ), + ); + }), + ); + it.effect("reuses the no-upstream fallback ahead count for default-branch delta", () => Effect.gen(function* () { const cwd = yield* makeTmpDir(); diff --git a/apps/server/src/vcs/GitVcsDriverCore.ts b/apps/server/src/vcs/GitVcsDriverCore.ts index a406cbce549..cc8f17066bb 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.ts @@ -1,13 +1,14 @@ import * as Arr from "effect/Array"; import * as Cache from "effect/Cache"; -import * as Data from "effect/Data"; import * as Crypto from "effect/Crypto"; import * as DateTime from "effect/DateTime"; import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; +import * as Equal from "effect/Equal"; import * as Encoding from "effect/Encoding"; import * as Exit from "effect/Exit"; import * as FileSystem from "effect/FileSystem"; +import * as Hash from "effect/Hash"; import * as Option from "effect/Option"; import * as Path from "effect/Path"; import * as PlatformError from "effect/PlatformError"; @@ -48,9 +49,9 @@ const REVIEW_DIFF_PATCH_MAX_OUTPUT_BYTES = 120_000; const REVIEW_UNTRACKED_DIFF_MAX_OUTPUT_BYTES = 80_000; const WORKSPACE_FILES_MAX_OUTPUT_BYTES = 120_000; const STATUS_UPSTREAM_REFRESH_INTERVAL = Duration.seconds(15); -const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5); +const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.minutes(1); -const STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN = Duration.seconds(5); +const STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN = Duration.seconds(30); const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048; const STATUS_UPSTREAM_REFRESH_ENV = Object.freeze({ GCM_INTERACTIVE: "never", @@ -90,10 +91,34 @@ type TraceTailState = { remainder: string; }; -class StatusRemoteRefreshCacheKey extends Data.Class<{ - gitCommonDir: string; - remoteName: string; -}> {} +class StatusRemoteRefreshCacheKey { + readonly objectDir: string; + readonly remoteName: string; + readonly cwd: string; + + constructor(objectDir: string, remoteName: string, cwd: string) { + this.objectDir = objectDir; + this.remoteName = remoteName; + this.cwd = cwd; + } + + [Equal.symbol](that: unknown) { + return ( + that instanceof StatusRemoteRefreshCacheKey && + this.objectDir === that.objectDir && + this.remoteName === that.remoteName + ); + } + + [Hash.symbol]() { + return Hash.string(`${this.objectDir}\0${this.remoteName}`); + } +} + +interface StatusRefreshLock { + readonly semaphore: Semaphore.Semaphore; + readonly users: number; +} interface ExecuteGitOptions { stdin?: string | undefined; @@ -920,23 +945,26 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* ); }); - const fetchRemoteForStatus = ( - gitCommonDir: string, - remoteName: string, - ): Effect.Effect => { - const fetchCwd = - path.basename(gitCommonDir) === ".git" ? path.dirname(gitCommonDir) : gitCommonDir; - return executeGit( + const fetchRemoteForStatus = (cwd: string, remoteName: string) => + executeGit( "GitVcsDriver.fetchRemoteForStatus", - fetchCwd, - ["--git-dir", gitCommonDir, "fetch", "--quiet", "--no-tags", remoteName], + cwd, + [ + "fetch", + "--quiet", + "--no-tags", + "--no-write-fetch-head", + "--no-auto-maintenance", + "--no-recurse-submodules", + "--", + remoteName, + ], { - allowNonZeroExit: true, env: STATUS_UPSTREAM_REFRESH_ENV, + fallbackErrorDetail: "Background Git fetch failed.", timeoutMs: Duration.toMillis(STATUS_UPSTREAM_REFRESH_TIMEOUT), }, ).pipe(Effect.asVoid); - }; const resolveGitCommonDir = Effect.fn("resolveGitCommonDir")(function* (cwd: string) { const gitCommonDir = yield* runGitStdout("GitVcsDriver.resolveGitCommonDir", cwd, [ @@ -946,10 +974,72 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* return path.isAbsolute(gitCommonDir) ? gitCommonDir : path.resolve(cwd, gitCommonDir); }); + const resolveCanonicalGitObjectDir = Effect.fn("resolveCanonicalGitObjectDir")(function* ( + cwd: string, + ) { + const gitCommonDir = yield* resolveGitCommonDir(cwd); + const objectDir = path.join(gitCommonDir, "objects"); + return yield* fileSystem.realPath(objectDir).pipe(Effect.orElseSucceed(() => objectDir)); + }); + + const statusRefreshLocks = yield* Ref.make>(new Map()); + const statusRefreshLocksGuard = yield* Semaphore.make(1); + + const withStatusRefreshLock = (objectDir: string, effect: Effect.Effect) => + Effect.acquireUseRelease( + statusRefreshLocksGuard.withPermits(1)( + Effect.gen(function* () { + const current = yield* Ref.get(statusRefreshLocks); + const existing = current.get(objectDir); + if (existing) { + yield* Ref.set( + statusRefreshLocks, + new Map(current).set(objectDir, { + semaphore: existing.semaphore, + users: existing.users + 1, + }), + ); + return existing.semaphore; + } + + const semaphore = yield* Semaphore.make(1); + yield* Ref.set( + statusRefreshLocks, + new Map(current).set(objectDir, { semaphore, users: 1 }), + ); + return semaphore; + }), + ), + (semaphore) => semaphore.withPermits(1)(effect), + (semaphore) => + statusRefreshLocksGuard.withPermits(1)( + Ref.update(statusRefreshLocks, (current) => { + const existing = current.get(objectDir); + if (!existing || existing.semaphore !== semaphore) { + return current; + } + + const next = new Map(current); + if (existing.users === 1) { + next.delete(objectDir); + } else { + next.set(objectDir, { + semaphore, + users: existing.users - 1, + }); + } + return next; + }), + ), + ); + const refreshStatusRemoteCacheEntry = Effect.fn("refreshStatusRemoteCacheEntry")(function* ( cacheKey: StatusRemoteRefreshCacheKey, ) { - yield* fetchRemoteForStatus(cacheKey.gitCommonDir, cacheKey.remoteName); + yield* withStatusRefreshLock( + cacheKey.objectDir, + fetchRemoteForStatus(cacheKey.cwd, cacheKey.remoteName), + ); return true as const; }); @@ -967,16 +1057,23 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* ) { const upstream = yield* resolveCurrentUpstream(cwd); if (!upstream) return; - const gitCommonDir = yield* resolveGitCommonDir(cwd); + const objectDir = yield* resolveCanonicalGitObjectDir(cwd); yield* Cache.get( statusRemoteRefreshCache, - new StatusRemoteRefreshCacheKey({ - gitCommonDir, - remoteName: upstream.remoteName, - }), + new StatusRemoteRefreshCacheKey(objectDir, upstream.remoteName, cwd), ); }); + const refreshStatusUpstream: GitVcsDriver.GitVcsDriver["Service"]["refreshStatusUpstream"] = + Effect.fn("refreshStatusUpstream")(function* (cwd) { + yield* refreshStatusUpstreamIfStale(cwd).pipe( + Effect.catchTags({ + GitCommandError: (error) => + isMissingGitCwdError(error) ? Effect.void : Effect.fail(error), + }), + ); + }); + const resolveDefaultBranchName = ( cwd: string, remoteName: string, @@ -1474,26 +1571,14 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* const statusDetails: GitVcsDriver.GitVcsDriver["Service"]["statusDetails"] = Effect.fn( "statusDetails", )(function* (cwd) { - yield* refreshStatusUpstreamIfStale(cwd).pipe( - Effect.catchTags({ - GitCommandError: (error) => - isMissingGitCwdError(error) ? Effect.void : Effect.fail(error), - }), - Effect.ignoreCause({ log: true }), - ); + yield* refreshStatusUpstream(cwd).pipe(Effect.ignore); return yield* readStatusDetailsLocal(cwd); }); const statusDetailsRemote: GitVcsDriver.GitVcsDriver["Service"]["statusDetailsRemote"] = Effect.fn("statusDetailsRemote")(function* (cwd, options) { if (options?.refreshUpstream !== false) { - yield* refreshStatusUpstreamIfStale(cwd).pipe( - Effect.catchTags({ - GitCommandError: (error) => - isMissingGitCwdError(error) ? Effect.void : Effect.fail(error), - }), - Effect.ignoreCause({ log: true }), - ); + yield* refreshStatusUpstream(cwd).pipe(Effect.ignore); } return yield* readStatusDetailsRemote(cwd); }); @@ -2535,6 +2620,7 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* statusDetails, statusDetailsLocal, statusDetailsRemote, + refreshStatusUpstream, prepareCommitContext, commit, pushCurrentBranch, diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts index 032e48e4612..26a0e99b057 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts @@ -69,6 +69,7 @@ function makeTestLayer(state: { remoteStatusCalls: number; localInvalidationCalls: number; remoteInvalidationCalls: number; + remoteRefreshCalls?: number; remoteStatusRefreshUpstreamValues?: Array; }) { return VcsStatusBroadcaster.layer.pipe( @@ -86,6 +87,10 @@ function makeTestLayer(state: { state.remoteStatusRefreshUpstreamValues?.push(options?.refreshUpstream); return state.currentRemoteStatus; }), + refreshStatusUpstream: () => + Effect.sync(() => { + state.remoteRefreshCalls = (state.remoteRefreshCalls ?? 0) + 1; + }), invalidateLocalStatus: () => Effect.sync(() => { state.localInvalidationCalls += 1; @@ -545,6 +550,96 @@ describe("VcsStatusBroadcaster", () => { ); }); + it.effect("publishes stale remote status and backs off when upstream refresh fails", () => { + const state = { + refreshCalls: 0, + remoteStatusCalls: 0, + remoteInvalidationCalls: 0, + remoteStatusRefreshUpstreamValues: [] as Array, + }; + let firstRefreshDeferred: Deferred.Deferred | null = null; + const testLayer = VcsStatusBroadcaster.layer.pipe( + Layer.provideMerge(NodeServices.layer), + Layer.provide( + Layer.mock(GitWorkflowService.GitWorkflowService)({ + localStatus: () => Effect.succeed(baseLocalStatus), + refreshStatusUpstream: () => + Effect.suspend(() => { + state.refreshCalls += 1; + if (state.refreshCalls !== 1) { + return Effect.void; + } + return Effect.fail( + new GitManagerError({ + operation: "GitWorkflowService.refreshStatusUpstream", + cwd: "/repo", + detail: "background fetch failed", + }), + ).pipe( + Effect.ensuring( + firstRefreshDeferred + ? Deferred.succeed(firstRefreshDeferred, undefined).pipe(Effect.ignore) + : Effect.void, + ), + ); + }), + remoteStatus: (_input, options) => + Effect.sync(() => { + state.remoteStatusCalls += 1; + state.remoteStatusRefreshUpstreamValues.push(options?.refreshUpstream); + return baseRemoteStatus; + }), + invalidateRemoteStatus: () => + Effect.sync(() => { + state.remoteInvalidationCalls += 1; + }), + } satisfies Partial), + ), + ); + + return Effect.gen(function* () { + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + const scope = yield* Scope.make(); + firstRefreshDeferred = yield* Deferred.make(); + const remoteUpdatedDeferred = yield* Deferred.make(); + yield* Stream.runForEach( + broadcaster.streamStatus( + { cwd: "/repo" }, + { automaticRemoteRefreshInterval: Effect.succeed(Duration.seconds(1)) }, + ), + (event) => + event._tag === "remoteUpdated" + ? Deferred.succeed(remoteUpdatedDeferred, event).pipe(Effect.ignore) + : Effect.void, + ).pipe(Effect.forkIn(scope)); + + yield* Deferred.await(firstRefreshDeferred); + const remoteUpdated = yield* Deferred.await(remoteUpdatedDeferred); + yield* Effect.yieldNow; + + assert.deepStrictEqual(remoteUpdated, { + _tag: "remoteUpdated", + remote: baseRemoteStatus, + } satisfies VcsStatusStreamEvent); + assert.equal(state.refreshCalls, 1); + assert.equal(state.remoteStatusCalls, 1); + assert.equal(state.remoteInvalidationCalls, 1); + assert.deepStrictEqual(state.remoteStatusRefreshUpstreamValues, [false]); + + yield* TestClock.adjust(Duration.seconds(29)); + assert.equal(state.refreshCalls, 1); + + yield* TestClock.adjust(Duration.seconds(1)); + yield* Effect.yieldNow; + assert.equal(state.refreshCalls, 2); + assert.equal(state.remoteStatusCalls, 2); + assert.equal(state.remoteInvalidationCalls, 2); + assert.deepStrictEqual(state.remoteStatusRefreshUpstreamValues, [false, false]); + + yield* Scope.close(scope, Exit.void); + }).pipe(Effect.provide(Layer.merge(testLayer, TestClock.layer()))); + }); + it.effect("delays automatic refresh when a cached remote snapshot is available", () => { const state = { currentLocalStatus: baseLocalStatus, @@ -667,6 +762,7 @@ describe("VcsStatusBroadcaster", () => { : Effect.void, ), ), + refreshStatusUpstream: () => Effect.void, invalidateLocalStatus: () => Effect.sync(() => { state.localInvalidationCalls += 1; diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.ts b/apps/server/src/vcs/VcsStatusBroadcaster.ts index c238154f58c..b74c40caff6 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.ts @@ -355,11 +355,19 @@ export const make = Effect.gen(function* () { cwd: string, options?: { readonly refreshUpstream?: boolean }, ) { + const refreshExit = + options?.refreshUpstream === false + ? Exit.void + : yield* workflow.refreshStatusUpstream(cwd).pipe(Effect.exit); if (options?.refreshUpstream !== false) { yield* workflow.invalidateRemoteStatus(cwd); } - const remote = yield* workflow.remoteStatus({ cwd }, options); - return yield* updateCachedRemoteStatus(cwd, remote, { publish: true }); + const remote = yield* workflow.remoteStatus({ cwd }, { refreshUpstream: false }); + const updated = yield* updateCachedRemoteStatus(cwd, remote, { publish: true }); + if (Exit.isFailure(refreshExit)) { + return yield* Effect.failCause(refreshExit.cause); + } + return updated; }); const refreshStatus: VcsStatusBroadcaster["Service"]["refreshStatus"] = Effect.fn( From 16e428220194b7075e12f026bad45b31d483ed80 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:14:37 -0700 Subject: [PATCH 05/15] perf(server): avoid tracing provider log hot paths --- apps/server/src/provider/Layers/ClaudeAdapter.ts | 2 +- apps/server/src/provider/Layers/CodexAdapter.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index 97a93f85829..09c4a7b3ce2 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -1387,7 +1387,7 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const offerRuntimeEvent = (event: ProviderRuntimeEvent): Effect.Effect => Queue.offer(runtimeEventQueue, event).pipe(Effect.asVoid); - const logNativeSdkMessage = Effect.fn("logNativeSdkMessage")(function* ( + const logNativeSdkMessage = Effect.fnUntraced(function* ( context: ClaudeSessionContext, message: SDKMessage, ) { diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 270126e934b..09e8ad37c70 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -1637,7 +1637,7 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( ), ); - const writeNativeEvent = Effect.fn("writeNativeEvent")(function* (event: ProviderEvent) { + const writeNativeEvent = Effect.fnUntraced(function* (event: ProviderEvent) { if (!nativeEventLogger) { return; } From 3d0f0744428493bb854ef891cc42e92e9bedfb19 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:14:52 -0700 Subject: [PATCH 06/15] fix(server): bound child process shutdown --- apps/server/src/bin.ts | 6 +- .../cli/boundedChildProcessSpawner.test.ts | 255 ++++++++++++++++++ .../src/cli/boundedChildProcessSpawner.ts | 158 +++++++++++ 3 files changed, 418 insertions(+), 1 deletion(-) create mode 100644 apps/server/src/cli/boundedChildProcessSpawner.test.ts create mode 100644 apps/server/src/cli/boundedChildProcessSpawner.ts diff --git a/apps/server/src/bin.ts b/apps/server/src/bin.ts index 9aaf41893c1..f387f628b45 100644 --- a/apps/server/src/bin.ts +++ b/apps/server/src/bin.ts @@ -10,12 +10,16 @@ import packageJson from "../package.json" with { type: "json" }; import { authCommand } from "./cli/auth.ts"; import { connectCommand } from "./cli/connect.ts"; import { hasCloudPublicConfig } from "./cloud/publicConfig.ts"; +import * as BoundedChildProcessSpawner from "./cli/boundedChildProcessSpawner.ts"; import { sharedServerCommandFlags } from "./cli/config.ts"; import { projectCommand } from "./cli/project.ts"; import { runServerCommand, serveCommand, startCommand } from "./cli/server.ts"; import { installNodeShutdownSignalEscalation } from "./cli/shutdownSignalEscalation.ts"; -const CliRuntimeLayer = Layer.mergeAll(NodeServices.layer, NetService.layer); +const CliRuntimeLayer = Layer.mergeAll( + BoundedChildProcessSpawner.layer().pipe(Layer.provideMerge(NodeServices.layer)), + NetService.layer, +); const connectPublicConfigMissingMessage = "T3 Connect commands are unavailable: this build is missing T3 Connect public configuration."; diff --git a/apps/server/src/cli/boundedChildProcessSpawner.test.ts b/apps/server/src/cli/boundedChildProcessSpawner.test.ts new file mode 100644 index 00000000000..b382dc14a4e --- /dev/null +++ b/apps/server/src/cli/boundedChildProcessSpawner.test.ts @@ -0,0 +1,255 @@ +import { NodeServices } from "@effect/platform-node"; +import { expect, it } from "@effect/vitest"; +import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; +import * as Deferred from "effect/Deferred"; +import * as Effect from "effect/Effect"; +import * as Exit from "effect/Exit"; +import * as Fiber from "effect/Fiber"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Scope from "effect/Scope"; +import * as Sink from "effect/Sink"; +import * as Stream from "effect/Stream"; +import { TestClock } from "effect/testing"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { layer, make, type ChildProcessShutdownSignal } from "./boundedChildProcessSpawner.ts"; + +const makeHandle = (input: { + readonly pid: number; + readonly exitCode: Effect.Effect; + readonly isRunning: Effect.Effect; + readonly kill?: ChildProcessSpawner.ChildProcessHandle["kill"]; + readonly unref?: ChildProcessSpawner.ChildProcessHandle["unref"]; +}) => + ChildProcessSpawner.makeHandle({ + pid: ChildProcessSpawner.ProcessId(input.pid), + exitCode: input.exitCode, + isRunning: input.isRunning, + kill: input.kill ?? (() => Effect.void), + unref: input.unref ?? Effect.succeed(Effect.void), + stdin: Sink.drain, + stdout: Stream.empty, + stderr: Stream.empty, + all: Stream.empty, + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + }); + +const errnoCode = (cause: unknown) => + cause instanceof Error ? Reflect.get(cause, "code") : undefined; + +const isProcessAlive = (pid: number) => { + try { + process.kill(pid, 0); + return true; + } catch (cause) { + return errnoCode(cause) !== "ESRCH"; + } +}; + +const killProcessGroup = (pid: number) => { + try { + process.kill(-pid, "SIGKILL"); + } catch { + try { + process.kill(pid, "SIGKILL"); + } catch { + // The process group has already exited. + } + } +}; + +const readFixturePids = (line: string) => { + const value: unknown = JSON.parse(line); + if (typeof value !== "object" || value === null) { + throw new Error("Expected the child fixture to print an object"); + } + const pid = Reflect.get(value, "pid"); + const descendantPid = Reflect.get(value, "descendantPid"); + if (typeof pid !== "number" || typeof descendantPid !== "number") { + throw new Error("Expected numeric child fixture pids"); + } + return { pid, descendantPid }; +}; + +const waitForProcessesToStop = Effect.fn("waitForProcessesToStop")(function* ( + pids: ReadonlyArray, + timeoutMs: number, +) { + let remainingMs = timeoutMs; + while (pids.some(isProcessAlive)) { + if (remainingMs <= 0) return false; + const delayMs = Math.min(10, remainingMs); + yield* Effect.sleep(delayMs); + remainingMs -= delayMs; + } + return true; +}); + +it.effect("closes the delegate scope after a natural child exit", () => + Effect.gen(function* () { + const exited = yield* Deferred.make(); + const delegateClosed = yield* Deferred.make(); + const handle = makeHandle({ + pid: 1, + exitCode: Deferred.await(exited), + isRunning: Deferred.isDone(exited).pipe(Effect.map((done) => !done)), + }); + const delegate = ChildProcessSpawner.make(() => + Effect.gen(function* () { + yield* Effect.addFinalizer(() => Deferred.succeed(delegateClosed, undefined)); + return handle; + }), + ); + const spawner = make(delegate); + const callerScope = yield* Scope.make(); + + const spawned = yield* spawner + .spawn(ChildProcess.make("unused")) + .pipe(Effect.provideService(Scope.Scope, callerScope)); + expect(spawned.pid).toBe(handle.pid); + + yield* Deferred.succeed(exited, ChildProcessSpawner.ExitCode(0)); + yield* Deferred.await(delegateClosed).pipe(Effect.timeout("1 second")); + yield* Scope.close(callerScope, Exit.void); + }), +); + +it.effect("preserves unref as an opt-out from scope-owned termination", () => + Effect.gen(function* () { + const exited = yield* Deferred.make(); + const signals: Array = []; + let running = true; + const handle = makeHandle({ + pid: 43, + exitCode: Deferred.await(exited), + isRunning: Effect.sync(() => running), + kill: ({ killSignal } = {}) => + Effect.sync(() => { + if (killSignal === "SIGTERM" || killSignal === "SIGKILL") signals.push(killSignal); + }), + }); + const spawner = make( + ChildProcessSpawner.make(() => Effect.succeed(handle)), + { + termGraceMs: 0, + killGraceMs: 0, + }, + ); + const callerScope = yield* Scope.make(); + const spawned = yield* spawner + .spawn(ChildProcess.make("unused")) + .pipe(Effect.provideService(Scope.Scope, callerScope)); + + const reref = yield* spawned.unref; + yield* Scope.close(callerScope, Exit.void); + + expect(signals).toEqual([]); + yield* reref; + running = false; + yield* Deferred.succeed(exited, ChildProcessSpawner.ExitCode(0)); + }), +); + +it.effect("does not wait for the delegate's unbounded finalizer", () => + Effect.gen(function* () { + const exited = yield* Deferred.make(); + const delegateCloseStarted = yield* Deferred.make(); + const allowDelegateClose = yield* Deferred.make(); + const signals: Array = []; + let running = true; + const handle = makeHandle({ + pid: 42, + exitCode: Deferred.await(exited), + isRunning: Effect.sync(() => running), + kill: ({ killSignal } = {}) => + Effect.sync(() => { + if (killSignal !== "SIGTERM" && killSignal !== "SIGKILL") return; + signals.push(killSignal); + if (killSignal === "SIGKILL") running = false; + }), + }); + const delegate = ChildProcessSpawner.make(() => + Effect.gen(function* () { + yield* Effect.addFinalizer(() => + Deferred.succeed(delegateCloseStarted, undefined).pipe( + Effect.andThen(Deferred.await(allowDelegateClose)), + ), + ); + return handle; + }), + ); + const spawner = make(delegate, { + termGraceMs: 0, + killGraceMs: 0, + }); + const callerScope = yield* Scope.make(); + + yield* spawner + .spawn(ChildProcess.make("unused")) + .pipe(Effect.provideService(Scope.Scope, callerScope)); + yield* Scope.close(callerScope, Exit.void); + + expect(signals).toEqual(["SIGTERM", "SIGKILL"]); + yield* Deferred.await(delegateCloseStarted).pipe(Effect.timeout("1 second")); + yield* Deferred.succeed(allowDelegateClose, undefined); + yield* Deferred.succeed(exited, ChildProcessSpawner.ExitCode(137)); + }), +); + +it.effect( + "kills a TERM-ignoring Node child and its descendant without blocking scope close", + () => { + const descendantScript = ` + process.on("SIGTERM", () => {}); + process.stdout.write("ready\\n"); + setInterval(() => {}, 1_000); + `; + const childScript = ` + const { spawn } = require("node:child_process"); + process.on("SIGTERM", () => {}); + const descendant = spawn(process.execPath, ["-e", ${JSON.stringify(descendantScript)}], { + stdio: ["ignore", "pipe", "inherit"], + }); + descendant.stdout.once("data", () => { + console.log(JSON.stringify({ pid: process.pid, descendantPid: descendant.pid })); + }); + setInterval(() => {}, 1_000); + `; + const boundedLayer = layer({ + termGraceMs: 100, + killGraceMs: 1_000, + pollIntervalMs: 10, + }).pipe(Layer.provideMerge(NodeServices.layer)); + + return Effect.gen(function* () { + const platform = yield* HostProcessPlatform; + if (platform === "win32") return; + + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const callerScope = yield* Scope.make(); + const handle = yield* spawner + .spawn(ChildProcess.make(process.execPath, ["-e", childScript])) + .pipe(Effect.provideService(Scope.Scope, callerScope)); + const line = yield* handle.stdout.pipe( + Stream.decodeText(), + Stream.splitLines, + Stream.runHead, + Effect.timeout("2 seconds"), + Effect.map(Option.getOrThrow), + ); + const pids = readFixturePids(line); + yield* Effect.addFinalizer(() => Effect.sync(() => killProcessGroup(pids.pid))); + + const closeFiber = yield* Scope.close(callerScope, Exit.void).pipe( + Effect.forkDetach({ startImmediately: true }), + ); + yield* Fiber.join(closeFiber).pipe(Effect.timeout("2 seconds")); + + expect(yield* waitForProcessesToStop([pids.pid, pids.descendantPid], 2_000)).toBe(true); + expect(isProcessAlive(pids.pid)).toBe(false); + expect(isProcessAlive(pids.descendantPid)).toBe(false); + }).pipe(Effect.scoped, Effect.provide(boundedLayer), TestClock.withLive); + }, +); diff --git a/apps/server/src/cli/boundedChildProcessSpawner.ts b/apps/server/src/cli/boundedChildProcessSpawner.ts new file mode 100644 index 00000000000..8fa934ed469 --- /dev/null +++ b/apps/server/src/cli/boundedChildProcessSpawner.ts @@ -0,0 +1,158 @@ +import * as Effect from "effect/Effect"; +import * as Exit from "effect/Exit"; +import * as Layer from "effect/Layer"; +import * as Scope from "effect/Scope"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +export type ChildProcessShutdownSignal = "SIGTERM" | "SIGKILL"; + +export interface BoundedChildProcessSpawnerOptions { + readonly termGraceMs?: number; + readonly killGraceMs?: number; + readonly pollIntervalMs?: number; +} + +const DEFAULT_TERM_GRACE_MS = 2_000; +const DEFAULT_KILL_GRACE_MS = 1_000; +const DEFAULT_POLL_INTERVAL_MS = 25; + +const normalizedDelay = (value: number | undefined, fallback: number) => + value === undefined || !Number.isFinite(value) ? fallback : Math.max(0, value); + +const isStillRunning = (handle: ChildProcessSpawner.ChildProcessHandle) => + handle.isRunning.pipe(Effect.orElseSucceed(() => true)); + +const waitUntilStopped = Effect.fn("BoundedChildProcessSpawner.waitUntilStopped")(function* ( + handle: ChildProcessSpawner.ChildProcessHandle, + timeoutMs: number, + pollIntervalMs: number, +) { + let remainingMs = timeoutMs; + + while (yield* isStillRunning(handle)) { + if (remainingMs <= 0) return false; + const delayMs = Math.min(pollIntervalMs, remainingMs); + yield* Effect.sleep(delayMs); + remainingMs -= delayMs; + } + + return true; +}); + +const closeScopeDetached = Effect.fn("BoundedChildProcessSpawner.closeScopeDetached")(function* ( + scope: Scope.Closeable, +) { + yield* Scope.close(scope, Exit.void).pipe( + Effect.ignoreCause({ log: true }), + Effect.forkDetach({ startImmediately: true }), + ); +}); + +export const make = ( + delegate: ChildProcessSpawner.ChildProcessSpawner["Service"], + options: BoundedChildProcessSpawnerOptions = {}, +) => { + const termGraceMs = normalizedDelay(options.termGraceMs, DEFAULT_TERM_GRACE_MS); + const killGraceMs = normalizedDelay(options.killGraceMs, DEFAULT_KILL_GRACE_MS); + const pollIntervalMs = Math.max( + 1, + normalizedDelay(options.pollIntervalMs, DEFAULT_POLL_INTERVAL_MS), + ); + const sendSignal = Effect.fn("BoundedChildProcessSpawner.sendSignal")(function* ( + handle: ChildProcessSpawner.ChildProcessHandle, + signal: ChildProcessShutdownSignal, + ) { + yield* handle + .kill({ killSignal: signal }) + .pipe(Effect.ignore, Effect.forkDetach({ startImmediately: true })); + }); + + const shutdown = Effect.fn("BoundedChildProcessSpawner.shutdown")(function* ( + handle: ChildProcessSpawner.ChildProcessHandle, + childScope: Scope.Closeable, + isReferenced: () => boolean, + ) { + if (isReferenced() && (yield* isStillRunning(handle))) { + yield* sendSignal(handle, "SIGTERM"); + const stoppedAfterTerm = yield* waitUntilStopped(handle, termGraceMs, pollIntervalMs); + + if (!stoppedAfterTerm) { + yield* sendSignal(handle, "SIGKILL"); + const stoppedAfterKill = yield* waitUntilStopped(handle, killGraceMs, pollIntervalMs); + if (!stoppedAfterKill) { + yield* Effect.logWarning("Child process did not stop after SIGKILL grace period", { + pid: Number(handle.pid), + killGraceMs, + }); + } + } + } + + // The platform spawner's own finalizer can wait forever for an exit event. + // Run it detached so it can finish normally without blocking the caller's + // scope when the operating system does not report an exit in time. + yield* closeScopeDetached(childScope); + }); + + const spawn = Effect.fn("BoundedChildProcessSpawner.spawn")(function* ( + command: ChildProcess.Command, + ) { + const childScope = yield* Scope.make("sequential"); + const spawned = yield* delegate + .spawn(command) + .pipe(Effect.provideService(Scope.Scope, childScope), Effect.exit); + + if (Exit.isFailure(spawned)) { + yield* closeScopeDetached(childScope); + return yield* Effect.failCause(spawned.cause); + } + + const delegateHandle = spawned.value; + let referenced = true; + const handle = ChildProcessSpawner.makeHandle({ + pid: delegateHandle.pid, + exitCode: delegateHandle.exitCode, + isRunning: delegateHandle.isRunning, + kill: delegateHandle.kill, + stdin: delegateHandle.stdin, + stdout: delegateHandle.stdout, + stderr: delegateHandle.stderr, + all: delegateHandle.all, + getInputFd: delegateHandle.getInputFd, + getOutputFd: delegateHandle.getOutputFd, + unref: delegateHandle.unref.pipe( + Effect.tap(() => + Effect.sync(() => { + referenced = false; + }), + ), + Effect.map((reref) => + reref.pipe( + Effect.tap(() => + Effect.sync(() => { + referenced = true; + }), + ), + ), + ), + ), + }); + yield* Effect.addFinalizer(() => shutdown(handle, childScope, () => referenced)); + yield* handle.exitCode.pipe( + Effect.exit, + Effect.andThen(Scope.close(childScope, Exit.void)), + Effect.ignoreCause({ log: true }), + Effect.forkDetach({ startImmediately: true }), + ); + + return handle; + }, Effect.uninterruptible); + + return ChildProcessSpawner.make(spawn); +}; + +export const layer = (options?: BoundedChildProcessSpawnerOptions) => + Layer.effect( + ChildProcessSpawner.ChildProcessSpawner, + Effect.map(ChildProcessSpawner.ChildProcessSpawner, (delegate) => make(delegate, options)), + ); From ed0591335e8fca48ce555fafeab9afd1ee163d91 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:18:24 -0700 Subject: [PATCH 07/15] fix(server): bound VCS status broadcaster state --- .../src/vcs/VcsStatusBroadcaster.test.ts | 71 ++++++++++++++++ apps/server/src/vcs/VcsStatusBroadcaster.ts | 83 ++++++++++++------- 2 files changed, 126 insertions(+), 28 deletions(-) diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts index 26a0e99b057..5e85345eb3a 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts @@ -130,6 +130,40 @@ describe("VcsStatusBroadcaster", () => { }).pipe(Effect.provide(makeTestLayer(state))); }); + it.effect("evicts the least recently used status after reaching cache capacity", () => { + const state = { + currentLocalStatus: baseLocalStatus, + currentRemoteStatus: baseRemoteStatus, + localStatusCalls: 0, + remoteStatusCalls: 0, + localInvalidationCalls: 0, + remoteInvalidationCalls: 0, + }; + + return Effect.gen(function* () { + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + + for (let index = 0; index < VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY; index += 1) { + yield* broadcaster.getStatus({ cwd: `/repo-${index}` }); + } + + assert.equal(state.localStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY); + assert.equal(state.remoteStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY); + + yield* broadcaster.getStatus({ cwd: "/repo-0" }); + yield* broadcaster.getStatus({ cwd: "/repo-overflow" }); + yield* broadcaster.getStatus({ cwd: "/repo-0" }); + + assert.equal(state.localStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY + 1); + assert.equal(state.remoteStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY + 1); + + yield* broadcaster.getStatus({ cwd: "/repo-1" }); + + assert.equal(state.localStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY + 2); + assert.equal(state.remoteStatusCalls, VcsStatusBroadcaster.VCS_STATUS_CACHE_CAPACITY + 2); + }).pipe(Effect.provide(makeTestLayer(state))); + }); + it.effect("refreshes the cached snapshot after explicit invalidation", () => { const state = { currentLocalStatus: baseLocalStatus, @@ -726,6 +760,43 @@ describe("VcsStatusBroadcaster", () => { }); }); + it.effect("evicts the cached status after the last stream subscriber disconnects", () => { + const state = { + currentLocalStatus: baseLocalStatus, + currentRemoteStatus: baseRemoteStatus, + localStatusCalls: 0, + remoteStatusCalls: 0, + localInvalidationCalls: 0, + remoteInvalidationCalls: 0, + }; + + return Effect.gen(function* () { + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + const remoteUpdated = yield* Deferred.make(); + const scope = yield* Scope.make(); + yield* Stream.runForEach( + broadcaster.streamStatus( + { cwd: "/repo" }, + { automaticRemoteRefreshInterval: Effect.succeed(Duration.zero) }, + ), + (event) => + event._tag === "remoteUpdated" + ? Deferred.succeed(remoteUpdated, event).pipe(Effect.ignore) + : Effect.void, + ).pipe(Effect.forkIn(scope)); + + yield* Deferred.await(remoteUpdated); + assert.equal(state.localStatusCalls, 1); + assert.equal(state.remoteStatusCalls, 1); + + yield* Scope.close(scope, Exit.void); + yield* broadcaster.getStatus({ cwd: "/repo" }); + + assert.equal(state.localStatusCalls, 2); + assert.equal(state.remoteStatusCalls, 2); + }).pipe(Effect.provide(makeTestLayer(state))); + }); + it.effect("stops the remote poller after the last stream subscriber disconnects", () => { const state = { currentLocalStatus: baseLocalStatus, diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.ts b/apps/server/src/vcs/VcsStatusBroadcaster.ts index b74c40caff6..cf89f77e9c8 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.ts @@ -29,6 +29,7 @@ const VCS_STATUS_REFRESH_FAILURE_BASE_DELAY = Duration.seconds(30); const VCS_STATUS_REFRESH_FAILURE_MAX_DELAY = Duration.minutes(15); const MAX_FAILURE_DIAGNOSTIC_VALUES = 8; const MAX_FAILURE_DIAGNOSTIC_VALUE_LENGTH = 128; +export const VCS_STATUS_CACHE_CAPACITY = 128; function boundedDiagnosticValue(value: string): string { return value.slice(0, MAX_FAILURE_DIAGNOSTIC_VALUE_LENGTH); @@ -188,13 +189,32 @@ export const make = Effect.gen(function* () { const broadcasterScope = yield* Effect.acquireRelease(Scope.make(), (scope) => Scope.close(scope, Exit.void), ); - const cacheRef = yield* Ref.make(new Map()); + const statusCache = new Map(); const pollersRef = yield* SynchronizedRef.make(new Map()); + const setCachedStatus = (cwd: string, status: CachedVcsStatus) => { + statusCache.delete(cwd); + statusCache.set(cwd, status); + + while (statusCache.size > VCS_STATUS_CACHE_CAPACITY) { + const oldestCwd = statusCache.keys().next().value; + if (oldestCwd === undefined) { + break; + } + statusCache.delete(oldestCwd); + } + }; + const getCachedStatus = Effect.fn("VcsStatusBroadcaster.getCachedStatus")(function* ( cwd: string, ) { - return yield* Ref.get(cacheRef).pipe(Effect.map((cache) => cache.get(cwd) ?? null)); + return yield* Effect.sync(() => { + const cached = statusCache.get(cwd) ?? null; + if (cached) { + setCachedStatus(cwd, cached); + } + return cached; + }); }); const updateCachedLocalStatus = Effect.fn("VcsStatusBroadcaster.updateCachedLocalStatus")( @@ -203,14 +223,13 @@ export const make = Effect.gen(function* () { fingerprint: fingerprintStatusPart(local), value: local, } satisfies CachedValue; - const shouldPublish = yield* Ref.modify(cacheRef, (cache) => { - const previous = cache.get(cwd) ?? { local: null, remote: null }; - const nextCache = new Map(cache); - nextCache.set(cwd, { + const shouldPublish = yield* Effect.sync(() => { + const previous = statusCache.get(cwd) ?? { local: null, remote: null }; + setCachedStatus(cwd, { ...previous, local: nextLocal, }); - return [previous.local?.fingerprint !== nextLocal.fingerprint, nextCache] as const; + return previous.local?.fingerprint !== nextLocal.fingerprint; }); if (options?.publish && shouldPublish) { @@ -233,14 +252,13 @@ export const make = Effect.gen(function* () { fingerprint: fingerprintStatusPart(remote), value: remote, } satisfies CachedValue; - const shouldPublish = yield* Ref.modify(cacheRef, (cache) => { - const previous = cache.get(cwd) ?? { local: null, remote: null }; - const nextCache = new Map(cache); - nextCache.set(cwd, { + const shouldPublish = yield* Effect.sync(() => { + const previous = statusCache.get(cwd) ?? { local: null, remote: null }; + setCachedStatus(cwd, { ...previous, remote: nextRemote, }); - return [previous.remote?.fingerprint !== nextRemote.fingerprint, nextCache] as const; + return previous.remote?.fingerprint !== nextRemote.fingerprint; }); if (options?.publish && shouldPublish) { @@ -271,18 +289,16 @@ export const make = Effect.gen(function* () { fingerprint: fingerprintStatusPart(remote), value: remote, } satisfies CachedValue; - const shouldPublish = yield* Ref.modify(cacheRef, (cache) => { - const previous = cache.get(cwd) ?? { local: null, remote: null }; - const nextCache = new Map(cache); - nextCache.set(cwd, { + const shouldPublish = yield* Effect.sync(() => { + const previous = statusCache.get(cwd) ?? { local: null, remote: null }; + setCachedStatus(cwd, { local: nextLocal, remote: nextRemote, }); - return [ + return ( previous.local?.fingerprint !== nextLocal.fingerprint || - previous.remote?.fingerprint !== nextRemote.fingerprint, - nextCache, - ] as const; + previous.remote?.fingerprint !== nextRemote.fingerprint + ); }); if (options?.publish && shouldPublish) { @@ -506,6 +522,16 @@ export const make = Effect.gen(function* () { if (pollerToInterrupt) { yield* Fiber.interrupt(pollerToInterrupt).pipe(Effect.ignore); + yield* SynchronizedRef.modifyEffect(pollersRef, (activePollers) => { + if (activePollers.has(cwd)) { + return Effect.succeed([undefined, activePollers] as const); + } + + return Effect.sync(() => { + statusCache.delete(cwd); + return [undefined, activePollers] as const; + }); + }); } }); @@ -517,15 +543,16 @@ export const make = Effect.gen(function* () { const initialLocal = yield* getOrLoadLocalStatus(cwd); const cachedStatus = yield* getCachedStatus(cwd); const initialRemote = cachedStatus?.remote?.value ?? null; - yield* retainRemotePoller( - cwd, - options?.automaticRemoteRefreshInterval ?? - Effect.succeed(DEFAULT_VCS_STATUS_REFRESH_INTERVAL), - cachedStatus?.remote === null || cachedStatus?.remote === undefined, + yield* Effect.acquireRelease( + retainRemotePoller( + cwd, + options?.automaticRemoteRefreshInterval ?? + Effect.succeed(DEFAULT_VCS_STATUS_REFRESH_INTERVAL), + cachedStatus?.remote === null || cachedStatus?.remote === undefined, + ), + () => releaseRemotePoller(cwd), ); - const release = releaseRemotePoller(cwd).pipe(Effect.ignore, Effect.asVoid); - return Stream.concat( Stream.make({ _tag: "snapshot" as const, @@ -536,7 +563,7 @@ export const make = Effect.gen(function* () { Stream.filter((event) => event.cwd === cwd), Stream.map((event) => event.event), ), - ).pipe(Stream.ensuring(release)); + ); }), ); From 45d301e83577ad9576fc350dfe2db0222aa94fbd Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:20:51 -0700 Subject: [PATCH 08/15] fix(server): close managed Claude event logs --- apps/server/src/provider/Layers/ClaudeAdapter.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index 09c4a7b3ce2..9da05578b4b 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -1354,6 +1354,8 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( stream: "native", }) : undefined); + const managedNativeEventLogger = + options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; const createQuery = options?.createQuery ?? @@ -3843,6 +3845,7 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( Effect.logError("Failed to emit Claude session shutdown event.", { cause }), ), Effect.tap(() => Queue.shutdown(runtimeEventQueue)), + Effect.tap(() => managedNativeEventLogger?.close() ?? Effect.void), ), ); From b9197b7e4c61d049ba73d5c0397bd5768885e33d Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:23:47 -0700 Subject: [PATCH 09/15] fix(server): bound explicit child process kills --- .../cli/boundedChildProcessSpawner.test.ts | 36 +++++++++++ .../src/cli/boundedChildProcessSpawner.ts | 61 +++++++++++++------ 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/apps/server/src/cli/boundedChildProcessSpawner.test.ts b/apps/server/src/cli/boundedChildProcessSpawner.test.ts index b382dc14a4e..051f9b14a25 100644 --- a/apps/server/src/cli/boundedChildProcessSpawner.test.ts +++ b/apps/server/src/cli/boundedChildProcessSpawner.test.ts @@ -152,6 +152,42 @@ it.effect("preserves unref as an opt-out from scope-owned termination", () => }), ); +it.effect("bounds explicit handle kill calls and escalates to SIGKILL", () => + Effect.gen(function* () { + const exited = yield* Deferred.make(); + const signals: Array = []; + let running = true; + const handle = makeHandle({ + pid: 44, + exitCode: Deferred.await(exited), + isRunning: Effect.sync(() => running), + kill: ({ killSignal } = {}) => + Effect.sync(() => { + if (killSignal !== "SIGTERM" && killSignal !== "SIGKILL") return; + signals.push(killSignal); + if (killSignal === "SIGKILL") running = false; + }).pipe(Effect.andThen(Deferred.await(exited)), Effect.asVoid), + }); + const spawner = make( + ChildProcessSpawner.make(() => Effect.succeed(handle)), + { + termGraceMs: 0, + killGraceMs: 0, + }, + ); + const callerScope = yield* Scope.make(); + const spawned = yield* spawner + .spawn(ChildProcess.make("unused")) + .pipe(Effect.provideService(Scope.Scope, callerScope)); + + yield* spawned.kill({ forceKillAfter: 0 }); + + expect(signals).toEqual(["SIGTERM", "SIGKILL"]); + yield* Deferred.succeed(exited, ChildProcessSpawner.ExitCode(137)); + yield* Scope.close(callerScope, Exit.void); + }), +); + it.effect("does not wait for the delegate's unbounded finalizer", () => Effect.gen(function* () { const exited = yield* Deferred.make(); diff --git a/apps/server/src/cli/boundedChildProcessSpawner.ts b/apps/server/src/cli/boundedChildProcessSpawner.ts index 8fa934ed469..ad61531f6b6 100644 --- a/apps/server/src/cli/boundedChildProcessSpawner.ts +++ b/apps/server/src/cli/boundedChildProcessSpawner.ts @@ -1,3 +1,4 @@ +import * as Duration from "effect/Duration"; import * as Effect from "effect/Effect"; import * as Exit from "effect/Exit"; import * as Layer from "effect/Layer"; @@ -60,32 +61,58 @@ export const make = ( ); const sendSignal = Effect.fn("BoundedChildProcessSpawner.sendSignal")(function* ( handle: ChildProcessSpawner.ChildProcessHandle, - signal: ChildProcessShutdownSignal, + signal: ChildProcess.Signal, ) { yield* handle .kill({ killSignal: signal }) .pipe(Effect.ignore, Effect.forkDetach({ startImmediately: true })); }); + const kill = Effect.fn("BoundedChildProcessSpawner.kill")(function* ( + handle: ChildProcessSpawner.ChildProcessHandle, + killOptions?: ChildProcess.KillOptions, + ) { + if (!(yield* isStillRunning(handle))) return; + + const initialSignal = killOptions?.killSignal ?? "SIGTERM"; + const initialGraceMs = + initialSignal === "SIGKILL" + ? killGraceMs + : Math.min( + termGraceMs, + normalizedDelay( + killOptions?.forceKillAfter === undefined + ? undefined + : Duration.toMillis(killOptions.forceKillAfter), + termGraceMs, + ), + ); + yield* sendSignal(handle, initialSignal); + const stoppedAfterInitialSignal = yield* waitUntilStopped( + handle, + initialGraceMs, + pollIntervalMs, + ); + if (stoppedAfterInitialSignal) return; + + if (initialSignal !== "SIGKILL") { + yield* sendSignal(handle, "SIGKILL"); + if (yield* waitUntilStopped(handle, killGraceMs, pollIntervalMs)) return; + } + + yield* Effect.logWarning("Child process did not stop after SIGKILL grace period", { + pid: Number(handle.pid), + killGraceMs, + }); + }); + const shutdown = Effect.fn("BoundedChildProcessSpawner.shutdown")(function* ( handle: ChildProcessSpawner.ChildProcessHandle, childScope: Scope.Closeable, isReferenced: () => boolean, ) { - if (isReferenced() && (yield* isStillRunning(handle))) { - yield* sendSignal(handle, "SIGTERM"); - const stoppedAfterTerm = yield* waitUntilStopped(handle, termGraceMs, pollIntervalMs); - - if (!stoppedAfterTerm) { - yield* sendSignal(handle, "SIGKILL"); - const stoppedAfterKill = yield* waitUntilStopped(handle, killGraceMs, pollIntervalMs); - if (!stoppedAfterKill) { - yield* Effect.logWarning("Child process did not stop after SIGKILL grace period", { - pid: Number(handle.pid), - killGraceMs, - }); - } - } + if (isReferenced()) { + yield* kill(handle); } // The platform spawner's own finalizer can wait forever for an exit event. @@ -113,7 +140,7 @@ export const make = ( pid: delegateHandle.pid, exitCode: delegateHandle.exitCode, isRunning: delegateHandle.isRunning, - kill: delegateHandle.kill, + kill: (killOptions) => kill(delegateHandle, killOptions), stdin: delegateHandle.stdin, stdout: delegateHandle.stdout, stderr: delegateHandle.stderr, @@ -137,7 +164,7 @@ export const make = ( ), ), }); - yield* Effect.addFinalizer(() => shutdown(handle, childScope, () => referenced)); + yield* Effect.addFinalizer(() => shutdown(delegateHandle, childScope, () => referenced)); yield* handle.exitCode.pipe( Effect.exit, Effect.andThen(Scope.close(childScope, Exit.void)), From 96afe3f82173d276feec488a64532f768c51fc7b Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:30:42 -0700 Subject: [PATCH 10/15] fix(server): retry interrupted Git status refreshes --- apps/server/src/vcs/GitVcsDriverCore.ts | 11 +-- .../src/vcs/VcsStatusBroadcaster.test.ts | 68 +++++++++++++++++++ 2 files changed, 75 insertions(+), 4 deletions(-) diff --git a/apps/server/src/vcs/GitVcsDriverCore.ts b/apps/server/src/vcs/GitVcsDriverCore.ts index cc8f17066bb..34174e05958 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.ts @@ -1,5 +1,6 @@ import * as Arr from "effect/Array"; import * as Cache from "effect/Cache"; +import * as Cause from "effect/Cause"; import * as Crypto from "effect/Crypto"; import * as DateTime from "effect/DateTime"; import * as Duration from "effect/Duration"; @@ -75,6 +76,11 @@ const NON_REPOSITORY_STATUS_DETAILS = Object.freeze(exit: Exit.Exit) => { + if (Exit.isSuccess(exit)) return STATUS_UPSTREAM_REFRESH_INTERVAL; + return Cause.hasInterrupts(exit.cause) ? Duration.zero : STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN; +}; const NON_REPOSITORY_REMOTE_STATUS_DETAILS = Object.freeze({ isRepo: false, isDefaultBranch: false, @@ -1046,10 +1052,7 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* const statusRemoteRefreshCache = yield* Cache.makeWith(refreshStatusRemoteCacheEntry, { capacity: STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY, // Keep successful refreshes warm and briefly back off failed refreshes to avoid retry storms. - timeToLive: (exit) => - Exit.isSuccess(exit) - ? STATUS_UPSTREAM_REFRESH_INTERVAL - : STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN, + timeToLive: statusUpstreamRefreshCacheTimeToLive, }); const refreshStatusUpstreamIfStale = Effect.fn("refreshStatusUpstreamIfStale")(function* ( diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts index 5e85345eb3a..e9855e80a20 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts @@ -1,5 +1,6 @@ import { assert, it, describe } from "@effect/vitest"; import * as NodeServices from "@effect/platform-node/NodeServices"; +import * as Cache from "effect/Cache"; import * as Cause from "effect/Cause"; import * as Deferred from "effect/Deferred"; import * as Duration from "effect/Duration"; @@ -21,6 +22,7 @@ import type { } from "@t3tools/contracts"; import { GitManagerError } from "@t3tools/contracts"; +import { statusUpstreamRefreshCacheTimeToLive } from "./GitVcsDriverCore.ts"; import * as VcsStatusBroadcaster from "./VcsStatusBroadcaster.ts"; import * as GitWorkflowService from "../git/GitWorkflowService.ts"; @@ -797,6 +799,72 @@ describe("VcsStatusBroadcaster", () => { }).pipe(Effect.provide(makeTestLayer(state))); }); + it.effect("restarts an interrupted refresh after a subscriber reconnects", () => + Effect.gen(function* () { + const firstRefreshStarted = yield* Deferred.make(); + const secondRefreshStarted = yield* Deferred.make(); + const secondRemoteUpdated = yield* Deferred.make(); + let refreshLookups = 0; + const refreshCache = yield* Cache.makeWith( + () => + Effect.gen(function* () { + refreshLookups += 1; + if (refreshLookups === 1) { + yield* Deferred.succeed(firstRefreshStarted, undefined); + return yield* Effect.never; + } + yield* Deferred.succeed(secondRefreshStarted, undefined); + }), + { + capacity: 1, + timeToLive: statusUpstreamRefreshCacheTimeToLive, + }, + ); + const testLayer = VcsStatusBroadcaster.layer.pipe( + Layer.provideMerge(NodeServices.layer), + Layer.provide( + Layer.mock(GitWorkflowService.GitWorkflowService)({ + localStatus: () => Effect.succeed(baseLocalStatus), + refreshStatusUpstream: () => Cache.get(refreshCache, "repo"), + remoteStatus: () => Effect.succeed(baseRemoteStatus), + invalidateRemoteStatus: () => Effect.void, + } satisfies Partial), + ), + ); + + yield* Effect.gen(function* () { + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + const firstScope = yield* Scope.make(); + yield* broadcaster + .streamStatus( + { cwd: "/repo" }, + { automaticRemoteRefreshInterval: Effect.succeed(Duration.hours(1)) }, + ) + .pipe(Stream.runDrain, Effect.forkIn(firstScope)); + + yield* Deferred.await(firstRefreshStarted).pipe(Effect.timeout("1 second")); + yield* Scope.close(firstScope, Exit.void); + + const secondScope = yield* Scope.make(); + yield* Stream.runForEach( + broadcaster.streamStatus( + { cwd: "/repo" }, + { automaticRemoteRefreshInterval: Effect.succeed(Duration.hours(1)) }, + ), + (event) => + event._tag === "remoteUpdated" + ? Deferred.succeed(secondRemoteUpdated, undefined).pipe(Effect.ignore) + : Effect.void, + ).pipe(Effect.forkIn(secondScope)); + + yield* Deferred.await(secondRefreshStarted).pipe(Effect.timeout("1 second")); + yield* Deferred.await(secondRemoteUpdated).pipe(Effect.timeout("1 second")); + assert.equal(refreshLookups, 2); + yield* Scope.close(secondScope, Exit.void); + }).pipe(Effect.provide(testLayer), TestClock.withLive); + }), + ); + it.effect("stops the remote poller after the last stream subscriber disconnects", () => { const state = { currentLocalStatus: baseLocalStatus, From c5974d38482bd0c4c512b41f313fd120a3e3acee Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 11:36:10 -0700 Subject: [PATCH 11/15] fix(server): isolate interrupted Git refresh waiters --- apps/server/src/vcs/GitVcsDriverCore.test.ts | 111 +++++++++++++++++++ apps/server/src/vcs/GitVcsDriverCore.ts | 14 +-- 2 files changed, 118 insertions(+), 7 deletions(-) diff --git a/apps/server/src/vcs/GitVcsDriverCore.test.ts b/apps/server/src/vcs/GitVcsDriverCore.test.ts index 4ed52be86d7..3fc50aa0894 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.test.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.test.ts @@ -2,10 +2,12 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { assert, it, describe } from "@effect/vitest"; import * as Effect from "effect/Effect"; import * as FileSystem from "effect/FileSystem"; +import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; import * as Path from "effect/Path"; import * as PlatformError from "effect/PlatformError"; import * as Scope from "effect/Scope"; +import * as TestClock from "effect/testing/TestClock"; import { GitCommandError } from "@t3tools/contracts"; import { HostProcessPlatform } from "@t3tools/shared/hostProcess"; @@ -670,6 +672,115 @@ it.layer(TestLayer)("GitVcsDriver core integration", (it) => { }), ); + it.effect("keeps a waiting worktree refresh alive when the cache owner disconnects", () => + Effect.gen(function* () { + const platform = yield* HostProcessPlatform; + if (platform === "win32") return; + + const cwd = yield* makeTmpDir(); + const origin = yield* makeTmpDir("git-vcs-driver-origin-"); + const worktreeParent = yield* makeTmpDir("git-vcs-driver-worktrees-"); + const wrapperDir = yield* makeTmpDir("git-vcs-driver-wrapper-"); + const { initialBranch } = yield* initRepoWithCommit(cwd); + const fileSystem = yield* FileSystem.FileSystem; + const pathService = yield* Path.Path; + const worktreeCwd = pathService.join(worktreeParent, "feature"); + const realGit = yield* findGitExecutable(); + const wrapperPath = pathService.join(wrapperDir, "git"); + const invocationLogPath = pathService.join(wrapperDir, "fetches.txt"); + const ownerLockPath = pathService.join(wrapperDir, "owner-fetch"); + const ownerStartedPath = pathService.join(wrapperDir, "owner-started"); + const waiterReadyPath = pathService.join(wrapperDir, "waiter-ready"); + const envKeys = [ + "PATH", + "T3_TEST_REAL_GIT", + "T3_TEST_FETCH_INVOCATION_LOG", + "T3_TEST_FETCH_OWNER_LOCK", + "T3_TEST_FETCH_OWNER_STARTED", + "T3_TEST_FETCH_WAITER_CWD", + "T3_TEST_FETCH_WAITER_READY", + ] as const; + const previousEnv = new Map(envKeys.map((key) => [key, process.env[key]])); + + yield* git(origin, ["init", "--bare"]); + yield* git(cwd, ["remote", "add", "origin", origin]); + yield* git(cwd, ["push", "-u", "origin", initialBranch]); + yield* git(cwd, ["worktree", "add", "-b", "feature/interrupted", worktreeCwd]); + yield* git(worktreeCwd, ["push", "-u", "origin", "feature/interrupted"]); + yield* fileSystem.writeFileString( + wrapperPath, + [ + "#!/bin/sh", + "is_fetch=0", + "is_common_dir=0", + 'for arg in "$@"; do', + ' if [ "$arg" = "fetch" ]; then is_fetch=1; fi', + ' if [ "$arg" = "--git-common-dir" ]; then is_common_dir=1; fi', + "done", + 'if [ "$is_fetch" = "1" ]; then', + ' printf "start:%s\\n" "$PWD" >> "$T3_TEST_FETCH_INVOCATION_LOG"', + ' if mkdir "$T3_TEST_FETCH_OWNER_LOCK" 2>/dev/null; then', + ' : > "$T3_TEST_FETCH_OWNER_STARTED"', + " trap 'exit 130' INT TERM HUP", + " while :; do sleep 1; done", + " fi", + ' exec "$T3_TEST_REAL_GIT" "$@"', + "fi", + '"$T3_TEST_REAL_GIT" "$@"', + "status=$?", + 'if [ "$PWD" = "$T3_TEST_FETCH_WAITER_CWD" ] && [ "$is_common_dir" = "1" ]; then', + ' : > "$T3_TEST_FETCH_WAITER_READY"', + "fi", + "exit $status", + "", + ].join("\n"), + ); + yield* fileSystem.chmod(wrapperPath, 0o755); + + yield* Effect.gen(function* () { + process.env.PATH = `${wrapperDir}:${previousEnv.get("PATH") ?? ""}`; + process.env.T3_TEST_REAL_GIT = realGit; + process.env.T3_TEST_FETCH_INVOCATION_LOG = invocationLogPath; + process.env.T3_TEST_FETCH_OWNER_LOCK = ownerLockPath; + process.env.T3_TEST_FETCH_OWNER_STARTED = ownerStartedPath; + process.env.T3_TEST_FETCH_WAITER_CWD = worktreeCwd; + process.env.T3_TEST_FETCH_WAITER_READY = waiterReadyPath; + + const waitForFile = (filePath: string) => + Effect.gen(function* () { + while (!(yield* fileSystem.exists(filePath))) { + yield* Effect.sleep("10 millis"); + } + }).pipe(Effect.timeout("2 seconds")); + const driver = yield* GitVcsDriver.GitVcsDriver; + const owner = yield* driver.refreshStatusUpstream(cwd).pipe(Effect.forkChild); + yield* waitForFile(ownerStartedPath); + + const waiter = yield* driver.refreshStatusUpstream(worktreeCwd).pipe(Effect.forkChild); + yield* waitForFile(waiterReadyPath); + yield* Effect.sleep("50 millis"); + yield* Fiber.interrupt(owner); + yield* Fiber.join(waiter).pipe(Effect.timeout("5 seconds")); + + assert.deepStrictEqual( + (yield* fileSystem.readFileString(invocationLogPath)).trim().split(/\r?\n/), + [`start:${cwd}`, `start:${worktreeCwd}`], + ); + }).pipe( + Effect.ensuring( + Effect.sync(() => { + for (const key of envKeys) { + const previous = previousEnv.get(key); + if (previous === undefined) delete process.env[key]; + else process.env[key] = previous; + } + }), + ), + TestClock.withLive, + ); + }), + ); + it.effect("reuses the no-upstream fallback ahead count for default-branch delta", () => Effect.gen(function* () { const cwd = yield* makeTmpDir(); diff --git a/apps/server/src/vcs/GitVcsDriverCore.ts b/apps/server/src/vcs/GitVcsDriverCore.ts index 34174e05958..3598babd334 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.ts @@ -1042,10 +1042,7 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* const refreshStatusRemoteCacheEntry = Effect.fn("refreshStatusRemoteCacheEntry")(function* ( cacheKey: StatusRemoteRefreshCacheKey, ) { - yield* withStatusRefreshLock( - cacheKey.objectDir, - fetchRemoteForStatus(cacheKey.cwd, cacheKey.remoteName), - ); + yield* fetchRemoteForStatus(cacheKey.cwd, cacheKey.remoteName); return true as const; }); @@ -1061,9 +1058,12 @@ export const makeGitVcsDriverCore = Effect.fn("makeGitVcsDriverCore")(function* const upstream = yield* resolveCurrentUpstream(cwd); if (!upstream) return; const objectDir = yield* resolveCanonicalGitObjectDir(cwd); - yield* Cache.get( - statusRemoteRefreshCache, - new StatusRemoteRefreshCacheKey(objectDir, upstream.remoteName, cwd), + yield* withStatusRefreshLock( + objectDir, + Cache.get( + statusRemoteRefreshCache, + new StatusRemoteRefreshCacheKey(objectDir, upstream.remoteName, cwd), + ), ); }); From a9a23e489a843168c610c5bc5c573139c5cc3c71 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 13:27:36 -0700 Subject: [PATCH 12/15] refactor(server): clarify provider log ownership --- .../src/provider/Layers/ClaudeAdapter.test.ts | 1 - apps/server/src/provider/Layers/ClaudeAdapter.ts | 10 ++++------ apps/server/src/provider/Layers/CodexAdapter.ts | 10 ++++------ apps/server/src/provider/Layers/CursorAdapter.ts | 10 ++++------ .../src/provider/Layers/EventNdjsonLogger.test.ts | 3 +++ .../src/provider/Layers/EventNdjsonLogger.ts | 10 +++++++--- .../server/src/provider/Layers/GrokAdapter.test.ts | 2 -- apps/server/src/provider/Layers/GrokAdapter.ts | 10 ++++------ apps/server/src/provider/Layers/OpenCodeAdapter.ts | 14 ++++++-------- .../src/provider/Layers/ProviderService.test.ts | 1 - .../src/provider/acp/AcpNativeLogging.test.ts | 3 --- 11 files changed, 32 insertions(+), 42 deletions(-) diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index 191bf8e27db..92e733469d0 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -3703,7 +3703,6 @@ describe("ClaudeAdapterLive", () => { nativeThreadIds.push(threadId ?? null); return Effect.void; }, - close: () => Effect.void, }, }); return Effect.gen(function* () { diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index 9da05578b4b..5e816dc1293 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -1347,15 +1347,13 @@ export const makeClaudeAdapter = Effect.fn("makeClaudeAdapter")(function* ( const claudeEnvironment = yield* makeClaudeEnvironment(claudeSettings, options?.environment).pipe( Effect.provideService(Path.Path, path), ); - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined + const managedNativeEventLogger = + options?.nativeEventLogger === undefined && options?.nativeEventLogPath !== undefined ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { stream: "native", }) - : undefined); - const managedNativeEventLogger = - options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + : undefined; + const nativeEventLogger = options?.nativeEventLogger ?? managedNativeEventLogger; const createQuery = options?.createQuery ?? diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 09e8ad37c70..63d9778e4b0 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -1354,15 +1354,13 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( const childProcessSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; const crypto = yield* Crypto.Crypto; const serverConfig = yield* Effect.service(ServerConfig); - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined + const managedNativeEventLogger = + options?.nativeEventLogger === undefined && options?.nativeEventLogPath !== undefined ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { stream: "native", }) - : undefined); - const managedNativeEventLogger = - options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + : undefined; + const nativeEventLogger = options?.nativeEventLogger ?? managedNativeEventLogger; const runtimeEventQueue = yield* Queue.unbounded(); const sessions = new Map(); diff --git a/apps/server/src/provider/Layers/CursorAdapter.ts b/apps/server/src/provider/Layers/CursorAdapter.ts index 59788a2d225..538941ea457 100644 --- a/apps/server/src/provider/Layers/CursorAdapter.ts +++ b/apps/server/src/provider/Layers/CursorAdapter.ts @@ -321,15 +321,13 @@ export function makeCursorAdapter( const childProcessSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; const serverConfig = yield* Effect.service(ServerConfig); const crypto = yield* Crypto.Crypto; - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined + const managedNativeEventLogger = + options?.nativeEventLogger === undefined && options?.nativeEventLogPath !== undefined ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { stream: "native", }) - : undefined); - const managedNativeEventLogger = - options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + : undefined; + const nativeEventLogger = options?.nativeEventLogger ?? managedNativeEventLogger; const makeAcpNativeLoggers = yield* makeAcpNativeLoggerFactory(); const sessions = new Map(); diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts index 6c2bb1f1cd3..6fa89adff0b 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts @@ -214,6 +214,9 @@ describe("EventNdjsonLogger", () => { const canonical = store.logger("canonical"); const threadId = ThreadId.make("thread-shared"); + assert.notProperty(native, "close"); + assert.notProperty(canonical, "close"); + yield* native.write({ id: "native-event" }, threadId); yield* canonical.write({ type: "item.completed", id: "canonical-event" }, threadId); yield* store.close(); diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index ab950340eb3..96e18929026 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -54,6 +54,9 @@ export type EventNdjsonStream = "native" | "canonical" | "orchestration"; export interface EventNdjsonLogger { readonly filePath: string; readonly write: (event: unknown, threadId: ThreadId | null) => Effect.Effect; +} + +export interface ManagedEventNdjsonLogger extends EventNdjsonLogger { readonly close: () => Effect.Effect; } @@ -579,7 +582,7 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( } }); - const view = { filePath, write, close } satisfies EventNdjsonLogger; + const view = { filePath, write } satisfies EventNdjsonLogger; loggerViews.set(stream, view); return view; }; @@ -590,7 +593,7 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( export const makeEventNdjsonLogger = Effect.fnUntraced(function* ( filePath: string, options: EventNdjsonLoggerOptions, -): Effect.fn.Return { +): Effect.fn.Return { const store = yield* makeEventNdjsonLogStore(filePath, options).pipe( Effect.catch((error) => logWarning(error.message, { error }).pipe( @@ -598,5 +601,6 @@ export const makeEventNdjsonLogger = Effect.fnUntraced(function* ( ), ), ); - return store?.logger(options.stream); + if (!store) return undefined; + return { ...store.logger(options.stream), close: store.close }; }); diff --git a/apps/server/src/provider/Layers/GrokAdapter.test.ts b/apps/server/src/provider/Layers/GrokAdapter.test.ts index 7b6f0972ae8..219278e820a 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.test.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.test.ts @@ -693,7 +693,6 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => { JSON.stringify(record).includes("late after cancel") ? Deferred.succeed(lateNativeUpdate, undefined).pipe(Effect.asVoid) : Effect.void, - close: () => Effect.void, }, }); @@ -1176,7 +1175,6 @@ it.layer(grokAdapterTestLayer)("GrokAdapterLive", (it) => { record.event.kind === "notification" ? Effect.die(new Error("native log write failed")) : Effect.void, - close: () => Effect.void, }, }); const contentDelta = yield* Deferred.make(); diff --git a/apps/server/src/provider/Layers/GrokAdapter.ts b/apps/server/src/provider/Layers/GrokAdapter.ts index c22b2180183..8354a09a157 100644 --- a/apps/server/src/provider/Layers/GrokAdapter.ts +++ b/apps/server/src/provider/Layers/GrokAdapter.ts @@ -232,13 +232,11 @@ export function makeGrokAdapter(grokSettings: GrokSettings, options?: GrokAdapte const childProcessSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; const serverConfig = yield* Effect.service(ServerConfig); const crypto = yield* Crypto.Crypto; - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined - ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { stream: "native" }) - : undefined); const managedNativeEventLogger = - options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + options?.nativeEventLogger === undefined && options?.nativeEventLogPath !== undefined + ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { stream: "native" }) + : undefined; + const nativeEventLogger = options?.nativeEventLogger ?? managedNativeEventLogger; const makeAcpNativeLoggers = yield* makeAcpNativeLoggerFactory(); const sessions = new Map(); diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 1eb6e47bc19..d79514ba5ed 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -433,17 +433,15 @@ export function makeOpenCodeAdapter( const serverConfig = yield* ServerConfig; const openCodeRuntime = yield* OpenCodeRuntime; const crypto = yield* Crypto.Crypto; - const nativeEventLogger = - options?.nativeEventLogger ?? - (options?.nativeEventLogPath !== undefined - ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { - stream: "native", - }) - : undefined); // Only close loggers we created. If the caller passed one in via // `options.nativeEventLogger`, they own its lifecycle. const managedNativeEventLogger = - options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + options?.nativeEventLogger === undefined && options?.nativeEventLogPath !== undefined + ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { + stream: "native", + }) + : undefined; + const nativeEventLogger = options?.nativeEventLogger ?? managedNativeEventLogger; const runtimeEvents = yield* Queue.unbounded(); const sessions = new Map(); const randomUUIDv4 = crypto.randomUUIDv4.pipe( diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index ccbbce1759f..e7626fed935 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -605,7 +605,6 @@ it.effect("ProviderServiceLive writes canonical events to the emitting thread se canonicalThreadIds.push(threadId ?? null); return Effect.void; }, - close: () => Effect.void, }, }).pipe( Layer.provide(Layer.succeed(ProviderAdapterRegistry.ProviderAdapterRegistry, registry)), diff --git a/apps/server/src/provider/acp/AcpNativeLogging.test.ts b/apps/server/src/provider/acp/AcpNativeLogging.test.ts index 8c92d523aee..940e0547548 100644 --- a/apps/server/src/provider/acp/AcpNativeLogging.test.ts +++ b/apps/server/src/provider/acp/AcpNativeLogging.test.ts @@ -21,7 +21,6 @@ nodeServicesIt("ACP native logging", (it) => { const nativeEventLogger: EventNdjsonLogger = { filePath: "/tmp/provider-native.ndjson", write: (event) => Effect.sync(() => void records.push(event)), - close: () => Effect.void, }; const makeLogger = yield* makeAcpNativeLoggerFactory(); const logger = makeLogger({ @@ -84,7 +83,6 @@ nodeServicesIt("ACP native logging", (it) => { nativeEventLogger: { filePath: "/tmp/provider-native.ndjson", write: () => Effect.die(new Error(secret)), - close: () => Effect.void, }, provider: ProviderDriverKind.make("cursor"), threadId: ThreadId.make("thread-1"), @@ -113,7 +111,6 @@ nodeServicesIt("ACP native logging", (it) => { nativeEventLogger: { filePath: "/tmp/provider-native.ndjson", write: () => Effect.interrupt, - close: () => Effect.void, }, provider: ProviderDriverKind.make("cursor"), threadId: ThreadId.make("thread-1"), From 40c5fc2121092bbb70ea7073497ab1e63132c510 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 13:35:33 -0700 Subject: [PATCH 13/15] fix(server): address vcs and log retention review --- .../provider/Layers/EventNdjsonLogger.test.ts | 38 +++++++----- .../src/provider/Layers/EventNdjsonLogger.ts | 17 +++++- apps/server/src/vcs/GitVcsDriverCore.ts | 7 ++- .../src/vcs/VcsStatusBroadcaster.test.ts | 59 +++++++++++++++++++ apps/server/src/vcs/VcsStatusBroadcaster.ts | 3 + 5 files changed, 104 insertions(+), 20 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts index 6fa89adff0b..07d9682717f 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.test.ts @@ -44,6 +44,13 @@ function parseLogLine(line: string) { }; } +function threadLogPath(basePath: string, threadSegment: string) { + return NodePath.join( + NodePath.dirname(basePath), + `${NodePath.parse(basePath).name}-${threadSegment}.log`, + ); +} + describe("EventNdjsonLogger", () => { it.effect("logs bounded diagnostics when an event cannot be serialized", () => { const messages: Array = []; @@ -100,8 +107,8 @@ describe("EventNdjsonLogger", () => { ); yield* logger.close(); - const threadOnePath = NodePath.join(tempDir, "thread-1.log"); - const threadTwoPath = NodePath.join(tempDir, "thread-2.log"); + const threadOnePath = threadLogPath(basePath, "thread-1"); + const threadTwoPath = threadLogPath(basePath, "thread-2"); assert.equal(NodeFS.existsSync(threadOnePath), true); assert.equal(NodeFS.existsSync(threadTwoPath), true); @@ -142,7 +149,7 @@ describe("EventNdjsonLogger", () => { yield* logger.write({ id: "evt-invalid-thread" }, "!!!" as unknown as ThreadId); yield* logger.close(); - const globalPath = NodePath.join(tempDir, "_global.log"); + const globalPath = threadLogPath(basePath, "_global"); assert.equal(NodeFS.existsSync(globalPath), true); const lines = NodeFS.readFileSync(globalPath, "utf8") .trim() @@ -185,7 +192,7 @@ describe("EventNdjsonLogger", () => { ); yield* logger.close(); - const globalPath = NodePath.join(tempDir, "_global.log"); + const globalPath = threadLogPath(basePath, "_global"); assert.equal(NodeFS.existsSync(globalPath), true); const lines = NodeFS.readFileSync(globalPath, "utf8") .trim() @@ -221,7 +228,7 @@ describe("EventNdjsonLogger", () => { yield* canonical.write({ type: "item.completed", id: "canonical-event" }, threadId); yield* store.close(); - const lines = NodeFS.readFileSync(NodePath.join(tempDir, "thread-shared.log"), "utf8") + const lines = NodeFS.readFileSync(threadLogPath(basePath, "thread-shared"), "utf8") .trim() .split("\n") .map(parseLogLine); @@ -260,7 +267,7 @@ describe("EventNdjsonLogger", () => { yield* native.write({ type: "content.delta", id: "native-delta" }, threadId); yield* store.close(); - const lines = NodeFS.readFileSync(NodePath.join(tempDir, "thread-filtered.log"), "utf8") + const lines = NodeFS.readFileSync(threadLogPath(basePath, "thread-filtered"), "utf8") .trim() .split("\n") .map(parseLogLine); @@ -308,7 +315,7 @@ describe("EventNdjsonLogger", () => { } yield* store.close(); - const fileStem = "thread-rotate.log"; + const fileStem = NodePath.basename(threadLogPath(basePath, "thread-rotate")); const matchingFiles = NodeFS.readdirSync(tempDir) .filter((entry) => entry === fileStem || entry.startsWith(`${fileStem}.`)) .toSorted(); @@ -360,7 +367,7 @@ describe("EventNdjsonLogger", () => { ); yield* store.close(); - const filePath = NodePath.join(tempDir, "thread-oversized.log"); + const filePath = threadLogPath(basePath, "thread-oversized"); assert.isAtMost(NodeFS.statSync(filePath).size, 256); const line = parseLogLine(NodeFS.readFileSync(filePath, "utf8").trim()); const payload = decodeOmittedRecord(line.payload); @@ -368,7 +375,7 @@ describe("EventNdjsonLogger", () => { assert.equal(payload.eventType, "item.completed"); assert.isAbove(payload.originalBytes, 2_000); - const longTypeFilePath = NodePath.join(tempDir, "thread-oversized-type.log"); + const longTypeFilePath = threadLogPath(basePath, "thread-oversized-type"); assert.isAtMost(NodeFS.statSync(longTypeFilePath).size, 256); const longTypeLine = parseLogLine(NodeFS.readFileSync(longTypeFilePath, "utf8").trim()); const longTypePayload = decodeOmittedRecord(longTypeLine.payload); @@ -383,20 +390,22 @@ describe("EventNdjsonLogger", () => { Effect.gen(function* () { const tempDir = NodeFS.mkdtempSync(NodePath.join(NodeOS.tmpdir(), "t3-provider-log-")); const basePath = NodePath.join(tempDir, "events.log"); - const expiredPath = NodePath.join(tempDir, "expired.log"); - const oldPath = NodePath.join(tempDir, "old.log"); - const newPath = NodePath.join(tempDir, "new.log"); + const expiredPath = threadLogPath(basePath, "expired"); + const oldPath = threadLogPath(basePath, "old"); + const newPath = threadLogPath(basePath, "new"); + const unrelatedLogPath = NodePath.join(tempDir, "unrelated.log"); const ignoredPath = NodePath.join(tempDir, "ignored.txt"); try { yield* TestClock.setTime(1_800_000_000_000); const now = yield* Clock.currentTimeMillis; - for (const filePath of [expiredPath, oldPath, newPath, ignoredPath]) { + for (const filePath of [expiredPath, oldPath, newPath, unrelatedLogPath, ignoredPath]) { NodeFS.writeFileSync(filePath, "x".repeat(400)); } NodeFS.utimesSync(expiredPath, (now - 20_000) / 1_000, (now - 20_000) / 1_000); NodeFS.utimesSync(oldPath, (now - 5_000) / 1_000, (now - 5_000) / 1_000); NodeFS.utimesSync(newPath, now / 1_000, now / 1_000); + NodeFS.utimesSync(unrelatedLogPath, (now - 20_000) / 1_000, (now - 20_000) / 1_000); const store = yield* makeEventNdjsonLogStore(basePath, { maxAgeMs: 10_000, @@ -407,6 +416,7 @@ describe("EventNdjsonLogger", () => { assert.equal(NodeFS.existsSync(expiredPath), false); assert.equal(NodeFS.existsSync(oldPath), false); assert.equal(NodeFS.existsSync(newPath), true); + assert.equal(NodeFS.existsSync(unrelatedLogPath), true); assert.equal(NodeFS.existsSync(ignoredPath), true); } finally { NodeFS.rmSync(tempDir, { recursive: true, force: true }); @@ -442,7 +452,7 @@ describe("EventNdjsonLogger", () => { yield* store.close(); const totalBytes = NodeFS.readdirSync(tempDir, { withFileTypes: true }) - .filter((entry) => entry.isFile() && /\.log(?:\.\d+)?$/u.test(entry.name)) + .filter((entry) => entry.isFile() && /^events-.+\.log(?:\.\d+)?$/u.test(entry.name)) .reduce( (total, entry) => total + NodeFS.statSync(NodePath.join(tempDir, entry.name)).size, 0, diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 96e18929026..839f28b7023 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -36,7 +36,6 @@ const DEFAULT_MAX_RECORD_BYTES = MEBIBYTE; const DEFAULT_MAX_CACHED_SINKS = 64; const GLOBAL_THREAD_SEGMENT = "_global"; const LOG_SCOPE = "provider-observability"; -const PROVIDER_LOG_FILE_PATTERN = /\.log(?:\.\d+)?$/u; const encodeUnknownJsonString = Schema.encodeUnknownEffect(Schema.UnknownFromJsonString); const transientCanonicalEventTypes = new Set([ @@ -244,6 +243,7 @@ function writeBatchedRecords( function enforceRetention(input: { readonly directory: string; + readonly filePattern: RegExp; readonly maxTotalBytes: number; readonly maxAgeMs: number; readonly now: number; @@ -263,7 +263,7 @@ function enforceRetention(input: { } for (const entry of entries) { - if (!entry.isFile() || !PROVIDER_LOG_FILE_PATTERN.test(entry.name)) continue; + if (!entry.isFile() || !input.filePattern.test(entry.name)) continue; const filePath = NodePath.join(input.directory, entry.name); try { const stat = NodeFS.statSync(filePath); @@ -352,6 +352,8 @@ function resolveOptions(filePath: string, options: EventNdjsonLogStoreOptions) { function drainPending(input: { readonly directory: string; + readonly filePrefix: string; + readonly filePattern: RegExp; readonly options: ResolvedOptions; readonly state: StoreState; readonly now: number; @@ -373,7 +375,7 @@ function drainPending(input: { let writtenBytes = 0; for (const [threadSegment, records] of recordsBySegment) { - const filePath = NodePath.join(input.directory, `${threadSegment}.log`); + const filePath = NodePath.join(input.directory, `${input.filePrefix}-${threadSegment}.log`); let sink = sinks.get(threadSegment); try { if (!sink) { @@ -412,6 +414,7 @@ function drainPending(input: { const retention = retentionDue ? enforceRetention({ directory: input.directory, + filePattern: input.filePattern, maxTotalBytes: input.options.maxTotalBytes, maxAgeMs: input.options.maxAgeMs, now: input.now, @@ -452,6 +455,9 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( ): Effect.fn.Return { const resolved = yield* resolveOptions(filePath, options); const directory = NodePath.dirname(filePath); + const filePrefix = + toSafeThreadAttachmentSegment(NodePath.parse(filePath).name) ?? "provider-events"; + const filePattern = new RegExp(`^${filePrefix}-.+\\.log(?:\\.\\d+)?$`, "u"); yield* Effect.try({ try: () => NodeFS.mkdirSync(directory, { recursive: true }), @@ -462,6 +468,7 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( const initialRetention = yield* Effect.sync(() => enforceRetention({ directory, + filePattern, maxTotalBytes: resolved.maxTotalBytes, maxAgeMs: resolved.maxAgeMs, now: initializedAt, @@ -499,6 +506,8 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( Effect.sync(() => drainPending({ directory, + filePrefix, + filePattern, options: resolved, state, now, @@ -569,6 +578,8 @@ export const makeEventNdjsonLogStore = Effect.fnUntraced(function* ( return Effect.sync(() => { const [drainResult, drainedState] = drainPending({ directory, + filePrefix, + filePattern, options: resolved, state: nextState, now, diff --git a/apps/server/src/vcs/GitVcsDriverCore.ts b/apps/server/src/vcs/GitVcsDriverCore.ts index 3598babd334..fccbcf130bb 100644 --- a/apps/server/src/vcs/GitVcsDriverCore.ts +++ b/apps/server/src/vcs/GitVcsDriverCore.ts @@ -50,7 +50,7 @@ const REVIEW_DIFF_PATCH_MAX_OUTPUT_BYTES = 120_000; const REVIEW_UNTRACKED_DIFF_MAX_OUTPUT_BYTES = 80_000; const WORKSPACE_FILES_MAX_OUTPUT_BYTES = 120_000; const STATUS_UPSTREAM_REFRESH_INTERVAL = Duration.seconds(15); -const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.minutes(1); +const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5); const STATUS_UPSTREAM_REFRESH_FAILURE_COOLDOWN = Duration.seconds(30); const STATUS_UPSTREAM_REFRESH_CACHE_CAPACITY = 2_048; @@ -112,12 +112,13 @@ class StatusRemoteRefreshCacheKey { return ( that instanceof StatusRemoteRefreshCacheKey && this.objectDir === that.objectDir && - this.remoteName === that.remoteName + this.remoteName === that.remoteName && + this.cwd === that.cwd ); } [Hash.symbol]() { - return Hash.string(`${this.objectDir}\0${this.remoteName}`); + return Hash.string(`${this.objectDir}\0${this.remoteName}\0${this.cwd}`); } } diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts index e9855e80a20..7264ef0cb32 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.test.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.test.ts @@ -865,6 +865,65 @@ describe("VcsStatusBroadcaster", () => { }), ); + it.effect("does not continue remote status work after an upstream refresh is interrupted", () => { + const state = { + remoteStatusCalls: 0, + remoteInvalidationCalls: 0, + }; + let refreshStartedDeferred: Deferred.Deferred | null = null; + let refreshInterruptedDeferred: Deferred.Deferred | null = null; + const testLayer = VcsStatusBroadcaster.layer.pipe( + Layer.provideMerge(NodeServices.layer), + Layer.provide( + Layer.mock(GitWorkflowService.GitWorkflowService)({ + localStatus: () => Effect.succeed(baseLocalStatus), + refreshStatusUpstream: () => + Effect.gen(function* () { + if (refreshStartedDeferred) { + yield* Deferred.succeed(refreshStartedDeferred, undefined); + } + return yield* Effect.never; + }).pipe( + Effect.onInterrupt(() => + refreshInterruptedDeferred + ? Deferred.succeed(refreshInterruptedDeferred, undefined).pipe(Effect.ignore) + : Effect.void, + ), + ), + remoteStatus: () => + Effect.sync(() => { + state.remoteStatusCalls += 1; + return baseRemoteStatus; + }), + invalidateRemoteStatus: () => + Effect.sync(() => { + state.remoteInvalidationCalls += 1; + }), + } satisfies Partial), + ), + ); + + return Effect.gen(function* () { + refreshStartedDeferred = yield* Deferred.make(); + refreshInterruptedDeferred = yield* Deferred.make(); + const broadcaster = yield* VcsStatusBroadcaster.VcsStatusBroadcaster; + const scope = yield* Scope.make(); + yield* broadcaster + .streamStatus( + { cwd: "/repo" }, + { automaticRemoteRefreshInterval: Effect.succeed(Duration.hours(1)) }, + ) + .pipe(Stream.runDrain, Effect.forkIn(scope)); + + yield* Deferred.await(refreshStartedDeferred); + yield* Scope.close(scope, Exit.void); + yield* Deferred.await(refreshInterruptedDeferred); + + assert.equal(state.remoteInvalidationCalls, 0); + assert.equal(state.remoteStatusCalls, 0); + }).pipe(Effect.provide(testLayer)); + }); + it.effect("stops the remote poller after the last stream subscriber disconnects", () => { const state = { currentLocalStatus: baseLocalStatus, diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.ts b/apps/server/src/vcs/VcsStatusBroadcaster.ts index cf89f77e9c8..f92157a075d 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.ts @@ -375,6 +375,9 @@ export const make = Effect.gen(function* () { options?.refreshUpstream === false ? Exit.void : yield* workflow.refreshStatusUpstream(cwd).pipe(Effect.exit); + if (Exit.isFailure(refreshExit) && Cause.hasInterrupts(refreshExit.cause)) { + return yield* Effect.failCause(refreshExit.cause); + } if (options?.refreshUpstream !== false) { yield* workflow.invalidateRemoteStatus(cwd); } From f891d526decc9adc87d0f44ea2749635c3febfa0 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 13:43:09 -0700 Subject: [PATCH 14/15] test(server): stabilize provider shutdown assertions --- apps/server/src/provider/Layers/CodexAdapter.test.ts | 2 +- apps/server/src/provider/Layers/CursorAdapter.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index 515a7c6fcbb..8fb8b4122f0 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -1227,7 +1227,7 @@ it.effect("flushes managed native logs when the adapter layer shuts down", () => yield* Scope.close(scope, Exit.void); scopeClosed = true; - const threadLogPath = NodePath.join(tempDir, "thread-logger.log"); + const threadLogPath = NodePath.join(tempDir, "provider-native-thread-logger.log"); NodeAssert.equal(NodeFS.existsSync(threadLogPath), true); const contents = NodeFS.readFileSync(threadLogPath, "utf8"); NodeAssert.match(contents, /NTIVE: .*"message":"native flush test"/); diff --git a/apps/server/src/provider/Layers/CursorAdapter.test.ts b/apps/server/src/provider/Layers/CursorAdapter.test.ts index 73dc0967622..b5b4bfbc19d 100644 --- a/apps/server/src/provider/Layers/CursorAdapter.test.ts +++ b/apps/server/src/provider/Layers/CursorAdapter.test.ts @@ -124,7 +124,7 @@ function waitForJsonLogMatch( if (requests.some(predicate)) { return requests; } - yield* Effect.yieldNow; + yield* Effect.sleep("10 millis").pipe(TestClock.withLive); } return yield* Effect.promise(() => readJsonLines(filePath)); }); From 4a4df05191fbfbf901cb93758cbeaac55c071e28 Mon Sep 17 00:00:00 2001 From: Ben Davis Date: Fri, 26 Jun 2026 13:50:39 -0700 Subject: [PATCH 15/15] fix(server): preserve active VCS cache entries --- apps/server/src/provider/Layers/EventNdjsonLogger.ts | 2 +- apps/server/src/vcs/VcsStatusBroadcaster.ts | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/apps/server/src/provider/Layers/EventNdjsonLogger.ts b/apps/server/src/provider/Layers/EventNdjsonLogger.ts index 839f28b7023..9c1ec6fc9e8 100644 --- a/apps/server/src/provider/Layers/EventNdjsonLogger.ts +++ b/apps/server/src/provider/Layers/EventNdjsonLogger.ts @@ -296,7 +296,7 @@ function enforceRetention(input: { (left, right) => left.mtimeMs - right.mtimeMs || left.filePath.localeCompare(right.filePath), )) { if (totalBytes <= input.maxTotalBytes) break; - remove(file); + if (!remove(file)) break; } return { failures, removedCount, totalBytes }; diff --git a/apps/server/src/vcs/VcsStatusBroadcaster.ts b/apps/server/src/vcs/VcsStatusBroadcaster.ts index f92157a075d..9bcbef205a5 100644 --- a/apps/server/src/vcs/VcsStatusBroadcaster.ts +++ b/apps/server/src/vcs/VcsStatusBroadcaster.ts @@ -190,6 +190,7 @@ export const make = Effect.gen(function* () { Scope.close(scope, Exit.void), ); const statusCache = new Map(); + const activeRemotePollerCwds = new Set(); const pollersRef = yield* SynchronizedRef.make(new Map()); const setCachedStatus = (cwd: string, status: CachedVcsStatus) => { @@ -197,7 +198,13 @@ export const make = Effect.gen(function* () { statusCache.set(cwd, status); while (statusCache.size > VCS_STATUS_CACHE_CAPACITY) { - const oldestCwd = statusCache.keys().next().value; + let oldestCwd: string | undefined; + for (const cachedCwd of statusCache.keys()) { + if (!activeRemotePollerCwds.has(cachedCwd)) { + oldestCwd = cachedCwd; + break; + } + } if (oldestCwd === undefined) { break; } @@ -489,6 +496,7 @@ export const make = Effect.gen(function* () { return makeRemoteRefreshLoop(cwd, automaticRemoteRefreshInterval, refreshImmediately).pipe( Effect.forkIn(broadcasterScope), Effect.map((fiber) => { + activeRemotePollerCwds.add(cwd); const nextPollers = new Map(activePollers); nextPollers.set(cwd, { fiber, @@ -531,6 +539,7 @@ export const make = Effect.gen(function* () { } return Effect.sync(() => { + activeRemotePollerCwds.delete(cwd); statusCache.delete(cwd); return [undefined, activePollers] as const; });