diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index d15b2efa2..a805e65d2 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -1735,6 +1735,7 @@ it.effect("restores pending turn-start metadata across projection pipeline resta yield* Effect.gen(function* () { const eventStore = yield* OrchestrationEventStore; const projectionPipeline = yield* OrchestrationProjectionPipeline; + const sql = yield* SqlClient.SqlClient; yield* eventStore.append({ type: "thread.turn-start-requested", @@ -1755,6 +1756,26 @@ it.effect("restores pending turn-start metadata across projection pipeline resta }); yield* projectionPipeline.bootstrap; + + const sessionRows = yield* sql<{ + readonly status: string; + readonly runtimeMode: string; + readonly providerName: string | null; + }>` + SELECT + status, + runtime_mode AS "runtimeMode", + provider_name AS "providerName" + FROM projection_thread_sessions + WHERE thread_id = ${threadId} + `; + assert.deepEqual(sessionRows, [ + { + status: "starting", + runtimeMode: "approval-required", + providerName: null, + }, + ]); }).pipe(Effect.provide(firstProjectionLayer)); const turnRows = yield* Effect.gen(function* () { diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 6ae94105a..2af3ada21 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -751,18 +751,40 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { _attachmentSideEffects, ) => Effect.gen(function* () { - if (event.type !== "thread.session-set") { - return; + switch (event.type) { + case "thread.turn-start-requested": { + const existingRow = yield* projectionThreadSessionRepository.getByThreadId({ + threadId: event.payload.threadId, + }); + yield* projectionThreadSessionRepository.upsert({ + threadId: event.payload.threadId, + status: "starting", + providerName: + event.payload.provider ?? + (Option.isSome(existingRow) ? existingRow.value.providerName : null), + runtimeMode: event.payload.runtimeMode, + activeTurnId: null, + lastError: null, + updatedAt: event.payload.createdAt, + }); + return; + } + + case "thread.session-set": + yield* projectionThreadSessionRepository.upsert({ + threadId: event.payload.threadId, + status: event.payload.session.status, + providerName: event.payload.session.providerName, + runtimeMode: event.payload.session.runtimeMode, + activeTurnId: event.payload.session.activeTurnId, + lastError: event.payload.session.lastError, + updatedAt: event.payload.session.updatedAt, + }); + return; + + default: + return; } - yield* projectionThreadSessionRepository.upsert({ - threadId: event.payload.threadId, - status: event.payload.session.status, - providerName: event.payload.session.providerName, - runtimeMode: event.payload.session.runtimeMode, - activeTurnId: event.payload.session.activeTurnId, - lastError: event.payload.session.lastError, - updatedAt: event.payload.session.updatedAt, - }); }); const applyThreadTurnsProjection: ProjectorDefinition["apply"] = ( diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 71f5b6bd4..ad989d65e 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -214,6 +214,72 @@ describe("orchestration projector", () => { expect(thread?.session?.status).toBe("running"); }); + it("marks a thread session as starting when a turn is requested", async () => { + const createdAt = "2026-02-23T08:00:00.000Z"; + const requestedAt = "2026-02-23T08:00:02.000Z"; + const model = createEmptyReadModel(createdAt); + + const afterCreate = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: createdAt, + commandId: "cmd-create", + payload: { + threadId: "thread-1", + projectId: "project-1", + title: "demo", + model: "gpt-5.3-codex", + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }), + ), + ); + + const afterTurnRequested = await Effect.runPromise( + projectEvent( + afterCreate, + makeEvent({ + sequence: 2, + type: "thread.turn-start-requested", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: requestedAt, + commandId: "cmd-turn-requested", + payload: { + threadId: "thread-1", + messageId: "message-1", + provider: "codex", + runtimeMode: "approval-required", + interactionMode: "plan", + assistantDeliveryMode: "streaming", + createdAt: requestedAt, + }, + }), + ), + ); + + const thread = afterTurnRequested.threads[0]; + expect(thread?.session).toEqual({ + threadId: "thread-1", + status: "starting", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: null, + updatedAt: requestedAt, + }); + expect(thread?.latestTurn).toBeNull(); + }); + it("updates canonical thread runtime mode from thread.runtime-mode-set", async () => { const createdAt = "2026-02-23T08:00:00.000Z"; const updatedAt = "2026-02-23T08:00:05.000Z"; diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 015f82a67..20de71cf6 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -23,6 +23,7 @@ import { ThreadRevertedPayload, ThreadSessionSetPayload, ThreadTurnDiffCompletedPayload, + ThreadTurnStartRequestedPayload, } from "./Schemas.ts"; type ThreadPatch = Partial>; @@ -391,6 +392,38 @@ export function projectEvent( }; }); + case "thread.turn-start-requested": + return Effect.gen(function* () { + const payload = yield* decodeForEvent( + ThreadTurnStartRequestedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const session: OrchestrationSession = { + threadId: payload.threadId, + status: "starting", + providerName: payload.provider ?? thread.session?.providerName ?? null, + runtimeMode: payload.runtimeMode, + activeTurnId: null, + lastError: null, + updatedAt: payload.createdAt, + }; + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + session, + updatedAt: event.occurredAt, + }), + }; + }); + case "thread.session-set": return Effect.gen(function* () { const payload = yield* decodeForEvent( diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index 7b31dbdf3..dd3fe801f 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -313,6 +313,20 @@ function addThreadToSnapshot( }; } +function mapThreadInSnapshot( + snapshot: OrchestrationReadModel, + threadId: ThreadId, + mapper: ( + thread: OrchestrationReadModel["threads"][number], + ) => OrchestrationReadModel["threads"][number], +): OrchestrationReadModel { + return { + ...snapshot, + snapshotSequence: snapshot.snapshotSequence + 1, + threads: snapshot.threads.map((thread) => (thread.id === threadId ? mapper(thread) : thread)), + }; +} + function createDraftOnlySnapshot(): OrchestrationReadModel { const snapshot = createSnapshotForTargetUser({ targetMessageId: "msg-user-draft-target" as MessageId, @@ -1201,6 +1215,69 @@ describe("ChatView timeline estimator parity (full app)", () => { } }); + it("re-enables sending after a turn completes without an observed running phase", async () => { + useComposerDraftStore.getState().setPrompt(THREAD_ID, "first prompt"); + + const mounted = await mountChatView({ + viewport: DEFAULT_VIEWPORT, + snapshot: createSnapshotForTargetUser({ + targetMessageId: "msg-user-fast-complete" as MessageId, + targetText: "fast complete target", + }), + }); + + try { + const sendButton = await waitForSendButton(); + expect(sendButton.disabled).toBe(false); + sendButton.click(); + + const completedSnapshot = mapThreadInSnapshot(fixture.snapshot, THREAD_ID, (thread) => ({ + ...thread, + latestTurn: { + turnId: "turn-fast-complete" as never, + state: "completed", + requestedAt: isoAt(2_000), + startedAt: isoAt(2_001), + completedAt: isoAt(2_003), + assistantMessageId: null, + }, + session: { + ...(thread.session ?? { + threadId: THREAD_ID, + status: "ready", + providerName: "codex", + runtimeMode: "full-access", + activeTurnId: null, + lastError: null, + updatedAt: NOW_ISO, + }), + status: "ready", + activeTurnId: null, + lastError: null, + updatedAt: isoAt(2_003), + }, + updatedAt: isoAt(2_003), + })); + fixture.snapshot = completedSnapshot; + useStore.getState().syncServerReadModel(completedSnapshot); + + useComposerDraftStore.getState().setPrompt(THREAD_ID, "second prompt"); + + await vi.waitFor( + () => { + const nextSendButton = document.querySelector( + 'button[aria-label="Send message"]', + ); + expect(nextSendButton).toBeTruthy(); + expect(nextSendButton?.disabled).toBe(false); + }, + { timeout: 8_000, interval: 16 }, + ); + } finally { + await mounted.cleanup(); + } + }); + it("shows a pointer cursor for the running stop button", async () => { const mounted = await mountChatView({ viewport: DEFAULT_VIEWPORT, diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 77cdb0ea1..31dca173a 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -55,6 +55,7 @@ import { deriveActivePlanState, findLatestProposedPlan, deriveWorkLogEntries, + hasLatestTurnObservationSince, hasToolActivityForTurn, isLatestTurnSettled, formatElapsed, @@ -2051,6 +2052,7 @@ export default function ChatView({ threadId }: ChatViewProps) { } if ( phase === "running" || + hasLatestTurnObservationSince(activeLatestTurn, sendStartedAt) || activePendingApproval !== null || activePendingUserInput !== null || activeThread?.error @@ -2060,9 +2062,11 @@ export default function ChatView({ threadId }: ChatViewProps) { }, [ activePendingApproval, activePendingUserInput, + activeLatestTurn, activeThread?.error, phase, resetSendPhase, + sendStartedAt, sendPhase, ]); diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index 74ba3a814..108a5ffbb 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -10,6 +10,7 @@ import { deriveTimelineEntries, deriveWorkLogEntries, findLatestProposedPlan, + hasLatestTurnObservationSince, hasToolActivityForTurn, isLatestTurnSettled, } from "./session-logic"; @@ -579,6 +580,7 @@ describe("hasToolActivityForTurn", () => { describe("isLatestTurnSettled", () => { const latestTurn = { turnId: TurnId.makeUnsafe("turn-1"), + requestedAt: "2026-02-27T21:10:00.000Z", startedAt: "2026-02-27T21:10:00.000Z", completedAt: "2026-02-27T21:10:06.000Z", } as const; @@ -615,6 +617,7 @@ describe("isLatestTurnSettled", () => { isLatestTurnSettled( { turnId: TurnId.makeUnsafe("turn-1"), + requestedAt: "2026-02-27T21:10:00.000Z", startedAt: null, completedAt: "2026-02-27T21:10:06.000Z", }, @@ -627,6 +630,7 @@ describe("isLatestTurnSettled", () => { describe("deriveActiveWorkStartedAt", () => { const latestTurn = { turnId: TurnId.makeUnsafe("turn-1"), + requestedAt: "2026-02-27T21:10:00.000Z", startedAt: "2026-02-27T21:10:00.000Z", completedAt: "2026-02-27T21:10:06.000Z", } as const; @@ -662,6 +666,7 @@ describe("deriveActiveWorkStartedAt", () => { deriveActiveWorkStartedAt( { turnId: TurnId.makeUnsafe("turn-1"), + requestedAt: "2026-02-27T21:10:00.000Z", startedAt: "2026-02-27T21:10:00.000Z", completedAt: "2026-02-27T21:10:06.000Z", }, @@ -672,6 +677,36 @@ describe("deriveActiveWorkStartedAt", () => { }); }); +describe("hasLatestTurnObservationSince", () => { + it("returns true when a completed turn was observed after send started", () => { + expect( + hasLatestTurnObservationSince( + { + turnId: TurnId.makeUnsafe("turn-2"), + requestedAt: "2026-02-27T21:11:00.000Z", + startedAt: "2026-02-27T21:11:01.000Z", + completedAt: "2026-02-27T21:11:03.000Z", + }, + "2026-02-27T21:10:59.000Z", + ), + ).toBe(true); + }); + + it("returns false for an older completed turn from before the current send", () => { + expect( + hasLatestTurnObservationSince( + { + turnId: TurnId.makeUnsafe("turn-1"), + requestedAt: "2026-02-27T21:10:00.000Z", + startedAt: "2026-02-27T21:10:00.000Z", + completedAt: "2026-02-27T21:10:06.000Z", + }, + "2026-02-27T21:11:00.000Z", + ), + ).toBe(false); + }); +}); + describe("PROVIDER_OPTIONS", () => { it("keeps Claude Code and Cursor visible as unavailable placeholders in the stack base", () => { const claude = PROVIDER_OPTIONS.find((option) => option.value === "claudeCode"); diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index e389f10e2..4c08562f9 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -116,7 +116,10 @@ export function formatElapsed(startIso: string, endIso: string | undefined): str return formatDuration(endedAt - startedAt); } -type LatestTurnTiming = Pick; +type LatestTurnTiming = Pick< + OrchestrationLatestTurn, + "turnId" | "requestedAt" | "startedAt" | "completedAt" +>; type SessionActivityState = Pick; export function isLatestTurnSettled( @@ -141,6 +144,30 @@ export function deriveActiveWorkStartedAt( return sendStartedAt; } +export function hasLatestTurnObservationSince( + latestTurn: LatestTurnTiming | null, + sendStartedAt: string | null, +): boolean { + if (!latestTurn || !sendStartedAt) { + return false; + } + + const sendStartedAtMs = Date.parse(sendStartedAt); + if (Number.isNaN(sendStartedAtMs)) { + return false; + } + + return [latestTurn.requestedAt, latestTurn.startedAt, latestTurn.completedAt].some( + (timestamp) => { + if (!timestamp) { + return false; + } + const timestampMs = Date.parse(timestamp); + return !Number.isNaN(timestampMs) && timestampMs >= sendStartedAtMs; + }, + ); +} + function requestKindFromRequestType(requestType: unknown): PendingApproval["requestKind"] | null { switch (requestType) { case "command_execution_approval":