diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index e34734d4..84fd4d44 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -49,6 +49,12 @@ import { type ProjectionThreadCheckpointContext, type ProjectionSnapshotQueryShape, } from "../Services/ProjectionSnapshotQuery.ts"; +import { + deriveLatestUserMessageAt, + hasActionableProposedPlanSignal, + hasPendingApprovalsSignal, + hasPendingUserInputSignal, +} from "@t3tools/shared/threadSignals"; const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel); const ProjectionProjectDbRowSchema = ProjectionProject.mapFields( @@ -251,6 +257,7 @@ function toProjectedThread(input: { readonly session: OrchestrationSession | null; }): OrchestrationThread { const { threadRow } = input; + const latestUserMessageAt = deriveLatestUserMessageAt(input.messages); return { id: threadRow.threadId, projectId: threadRow.projectId, @@ -272,6 +279,13 @@ function toProjectedThread(input: { latestTurn: input.latestTurn, createdAt: threadRow.createdAt, updatedAt: threadRow.updatedAt, + latestUserMessageAt, + hasPendingApprovals: hasPendingApprovalsSignal(input.activities), + hasPendingUserInput: hasPendingUserInputSignal(input.activities), + hasActionableProposedPlan: hasActionableProposedPlanSignal( + input.proposedPlans, + input.latestTurn, + ), archivedAt: threadRow.archivedAt ?? null, deletedAt: threadRow.deletedAt, handoff: threadRow.handoff, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 6475e407..bad8e301 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -336,6 +336,53 @@ describe("ProviderCommandReactor", () => { expect(thread?.session?.runtimeMode).toBe("approval-required"); }); + it("starts a fresh provider session when the projected session is stale after restart", async () => { + const harness = await createHarness(); + const now = new Date().toISOString(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.session.set", + commandId: CommandId.makeUnsafe("cmd-session-stale-projected"), + threadId: ThreadId.makeUnsafe("thread-1"), + session: { + threadId: ThreadId.makeUnsafe("thread-1"), + status: "ready", + providerName: "codex", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: null, + updatedAt: now, + }, + createdAt: now, + }), + ); + + harness.startSession.mockClear(); + harness.sendTurn.mockClear(); + + await Effect.runPromise( + harness.engine.dispatch({ + type: "thread.turn.start", + commandId: CommandId.makeUnsafe("cmd-turn-start-after-stale-session"), + threadId: ThreadId.makeUnsafe("thread-1"), + message: { + messageId: asMessageId("user-message-stale-session"), + role: "user", + text: "resume after backend restart", + attachments: [], + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: "approval-required", + createdAt: now, + }), + ); + + await waitFor(() => harness.startSession.mock.calls.length === 1); + await waitFor(() => harness.sendTurn.mock.calls.length === 1); + expect(harness.startSession.mock.calls[0]?.[0]).toEqual(ThreadId.makeUnsafe("thread-1")); + }); + it("renames a generic first-turn thread title using text generation", async () => { const harness = await createHarness(); const now = new Date().toISOString(); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index 8d0ac3e0..bf2b24cc 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -388,14 +388,15 @@ const make = Effect.gen(function* () { createdAt, }); + const activeSession = thread.session ? yield* resolveActiveSession(thread.id) : undefined; + // Only trust the projected session if the provider runtime still has a live session. const existingSessionThreadId = - thread.session && thread.session.status !== "stopped" ? thread.id : null; + thread.session && thread.session.status !== "stopped" && activeSession ? thread.id : null; if (existingSessionThreadId) { const runtimeModeChanged = thread.runtimeMode !== thread.session?.runtimeMode; const providerChanged = requestedModelSelection !== undefined && requestedModelSelection.provider !== currentProvider; - const activeSession = yield* resolveActiveSession(existingSessionThreadId); const sessionModelSwitch = currentProvider === undefined ? "in-session" diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index 4c5242cc..b6b2529e 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -44,6 +44,10 @@ export function buildLocalDraftThread( error, createdAt: draftThread.createdAt, latestTurn: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, lastVisitedAt: draftThread.createdAt, branch: draftThread.branch, worktreePath: draftThread.worktreePath, diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index a62af584..d6cf5f64 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -186,6 +186,10 @@ describe("pin helpers", () => { error: null, createdAt: "2026-03-09T10:00:00.000Z", latestTurn: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, turnDiffSummaries: [], activities: [], branch: null, @@ -784,6 +788,10 @@ function makeThread(overrides: Partial = {}): Thread { createdAt: "2026-03-09T10:00:00.000Z", updatedAt: "2026-03-09T10:00:00.000Z", latestTurn: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, branch: null, worktreePath: null, turnDiffSummaries: [], diff --git a/apps/web/src/focusedChatContext.test.ts b/apps/web/src/focusedChatContext.test.ts index e02c75cd..0f47e359 100644 --- a/apps/web/src/focusedChatContext.test.ts +++ b/apps/web/src/focusedChatContext.test.ts @@ -48,6 +48,10 @@ function makeThread(threadId: ThreadId, overrides: Partial = {}): Thread assistantMessageId: null, sourceProposedPlan: undefined, }, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, lastVisitedAt: "2026-04-07T10:01:00.000Z", branch: null, worktreePath: null, diff --git a/apps/web/src/notifications/taskCompletion.logic.test.ts b/apps/web/src/notifications/taskCompletion.logic.test.ts index be3b8323..2893f954 100644 --- a/apps/web/src/notifications/taskCompletion.logic.test.ts +++ b/apps/web/src/notifications/taskCompletion.logic.test.ts @@ -44,6 +44,10 @@ function makeThread(overrides: Partial): Thread { completedAt: null, assistantMessageId: null, }, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, lastVisitedAt: "2026-04-05T10:00:00.000Z", branch: null, worktreePath: null, diff --git a/apps/web/src/routes/__root.tsx b/apps/web/src/routes/__root.tsx index f997aed2..23802082 100644 --- a/apps/web/src/routes/__root.tsx +++ b/apps/web/src/routes/__root.tsx @@ -1,4 +1,4 @@ -import { ThreadId } from "@t3tools/contracts"; +import { ThreadId, type OrchestrationEvent } from "@t3tools/contracts"; import { defaultTerminalTitleForCliKind } from "@t3tools/shared/terminalThreads"; import { Outlet, @@ -144,8 +144,46 @@ function wait(ms: number): Promise { return new Promise((resolve) => window.setTimeout(resolve, ms)); } +function coalesceOrchestrationUiEvents( + events: ReadonlyArray, +): OrchestrationEvent[] { + if (events.length < 2) { + return [...events]; + } + + const coalesced: OrchestrationEvent[] = []; + for (const event of events) { + const previous = coalesced.at(-1); + if ( + previous?.type === "thread.message-sent" && + event.type === "thread.message-sent" && + previous.payload.threadId === event.payload.threadId && + previous.payload.messageId === event.payload.messageId + ) { + coalesced[coalesced.length - 1] = { + ...event, + payload: { + ...event.payload, + attachments: event.payload.attachments ?? previous.payload.attachments, + createdAt: previous.payload.createdAt, + text: + !event.payload.streaming && event.payload.text.length > 0 + ? event.payload.text + : previous.payload.text + event.payload.text, + }, + }; + continue; + } + + coalesced.push(event); + } + + return coalesced; +} + function EventRouter() { const syncServerReadModel = useStore((store) => store.syncServerReadModel); + const applyOrchestrationEvents = useStore((store) => store.applyOrchestrationEvents); const setProjectExpanded = useStore((store) => store.setProjectExpanded); const removeOrphanedTerminalStates = useTerminalStateStore( (store) => store.removeOrphanedTerminalStates, @@ -168,6 +206,7 @@ function EventRouter() { let syncing = false; let pending = false; let needsProviderInvalidation = false; + let pendingDomainEvents: OrchestrationEvent[] = []; const removeOrphanedTerminalsForCurrentState = () => { const draftThreadIds = Object.keys( @@ -234,6 +273,10 @@ function EventRouter() { const domainEventFlushThrottler = new Throttler( () => { + if (pendingDomainEvents.length > 0) { + applyOrchestrationEvents(coalesceOrchestrationUiEvents(pendingDomainEvents)); + pendingDomainEvents = []; + } if (needsProviderInvalidation) { needsProviderInvalidation = false; void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all }); @@ -255,25 +298,10 @@ function EventRouter() { return; } latestSequence = event.sequence; + pendingDomainEvents.push(event); if (event.type === "thread.turn-diff-completed" || event.type === "thread.reverted") { needsProviderInvalidation = true; } - if (event.type === "thread.turn-diff-completed") { - useStore.getState().applyThreadTurnDiffCompleted(event.payload.threadId, { - turnId: event.payload.turnId, - completedAt: event.payload.completedAt, - status: event.payload.status, - files: event.payload.files.map((file) => ({ - path: file.path, - ...(file.kind !== undefined ? { kind: file.kind } : {}), - ...(file.additions !== undefined ? { additions: file.additions } : {}), - ...(file.deletions !== undefined ? { deletions: file.deletions } : {}), - })), - checkpointRef: event.payload.checkpointRef, - assistantMessageId: event.payload.assistantMessageId ?? undefined, - checkpointTurnCount: event.payload.checkpointTurnCount, - }); - } domainEventFlushThrottler.maybeExecute(); }); const unsubTerminalEvent = api.terminal.onEvent((event) => { @@ -378,6 +406,7 @@ function EventRouter() { unsubServerConfigUpdated(); }; }, [ + applyOrchestrationEvents, navigate, queryClient, removeOrphanedTerminalStates, diff --git a/apps/web/src/session-logic.ts b/apps/web/src/session-logic.ts index 6b39e056..78199f13 100644 --- a/apps/web/src/session-logic.ts +++ b/apps/web/src/session-logic.ts @@ -16,6 +16,12 @@ import { decodeSubagentReceiverAgents, decodeSubagentReceiverThreadIds, } from "@t3tools/shared/subagents"; +import { + compareThreadActivitiesByOrder, + derivePendingApprovalSignals, + derivePendingUserInputSignals, + requestKindFromActivityPayload, +} from "@t3tools/shared/threadSignals"; import { deriveReadableToolTitle, normalizeCompactToolLabel } from "./lib/toolCallLabel"; import type { @@ -204,187 +210,16 @@ export function deriveActiveWorkStartedAt( return sendStartedAt; } -function requestKindFromRequestType(requestType: unknown): PendingApproval["requestKind"] | null { - switch (requestType) { - case "command_execution_approval": - case "exec_command_approval": - return "command"; - case "file_read_approval": - return "file-read"; - case "file_change_approval": - case "apply_patch_approval": - return "file-change"; - default: - return null; - } -} - -function isStalePendingRequestFailureDetail(detail: string | undefined): boolean { - const normalized = detail?.toLowerCase(); - if (!normalized) { - return false; - } - return ( - normalized.includes("stale pending approval request") || - normalized.includes("stale pending user-input request") || - normalized.includes("unknown pending approval request") || - normalized.includes("unknown pending permission request") || - normalized.includes("unknown pending user-input request") - ); -} - export function derivePendingApprovals( activities: ReadonlyArray, ): PendingApproval[] { - const openByRequestId = new Map(); - const ordered = [...activities].toSorted(compareActivitiesByOrder); - - for (const activity of ordered) { - const payload = - activity.payload && typeof activity.payload === "object" - ? (activity.payload as Record) - : null; - const requestId = - payload && typeof payload.requestId === "string" - ? ApprovalRequestId.makeUnsafe(payload.requestId) - : null; - const requestKind = - payload && - (payload.requestKind === "command" || - payload.requestKind === "file-read" || - payload.requestKind === "file-change") - ? payload.requestKind - : payload - ? requestKindFromRequestType(payload.requestType) - : null; - const detail = payload && typeof payload.detail === "string" ? payload.detail : undefined; - - if (activity.kind === "approval.requested" && requestId && requestKind) { - openByRequestId.set(requestId, { - requestId, - requestKind, - createdAt: activity.createdAt, - ...(detail ? { detail } : {}), - }); - continue; - } - - if (activity.kind === "approval.resolved" && requestId) { - openByRequestId.delete(requestId); - continue; - } - - if ( - activity.kind === "provider.approval.respond.failed" && - requestId && - isStalePendingRequestFailureDetail(detail) - ) { - openByRequestId.delete(requestId); - continue; - } - } - - return [...openByRequestId.values()].toSorted((left, right) => - left.createdAt.localeCompare(right.createdAt), - ); -} - -function parseUserInputQuestions( - payload: Record | null, -): ReadonlyArray | null { - const questions = payload?.questions; - if (!Array.isArray(questions)) { - return null; - } - const parsed = questions - .map((entry) => { - if (!entry || typeof entry !== "object") return null; - const question = entry as Record; - if ( - typeof question.id !== "string" || - typeof question.header !== "string" || - typeof question.question !== "string" || - !Array.isArray(question.options) - ) { - return null; - } - const options = question.options - .map((option) => { - if (!option || typeof option !== "object") return null; - const optionRecord = option as Record; - if ( - typeof optionRecord.label !== "string" || - typeof optionRecord.description !== "string" - ) { - return null; - } - return { - label: optionRecord.label, - description: optionRecord.description, - }; - }) - .filter((option): option is UserInputQuestion["options"][number] => option !== null); - if (options.length === 0) { - return null; - } - return { - id: question.id, - header: question.header, - question: question.question, - options, - }; - }) - .filter((question): question is UserInputQuestion => question !== null); - return parsed.length > 0 ? parsed : null; + return [...derivePendingApprovalSignals(activities)]; } export function derivePendingUserInputs( activities: ReadonlyArray, ): PendingUserInput[] { - const openByRequestId = new Map(); - const ordered = [...activities].toSorted(compareActivitiesByOrder); - - for (const activity of ordered) { - const payload = - activity.payload && typeof activity.payload === "object" - ? (activity.payload as Record) - : null; - const requestId = - payload && typeof payload.requestId === "string" - ? ApprovalRequestId.makeUnsafe(payload.requestId) - : null; - const detail = payload && typeof payload.detail === "string" ? payload.detail : undefined; - - if (activity.kind === "user-input.requested" && requestId) { - const questions = parseUserInputQuestions(payload); - if (!questions) { - continue; - } - openByRequestId.set(requestId, { - requestId, - createdAt: activity.createdAt, - questions, - }); - continue; - } - - if (activity.kind === "user-input.resolved" && requestId) { - openByRequestId.delete(requestId); - continue; - } - - if ( - activity.kind === "provider.user-input.respond.failed" && - requestId && - isStalePendingRequestFailureDetail(detail) - ) { - openByRequestId.delete(requestId); - } - } - - return [...openByRequestId.values()].toSorted((left, right) => - left.createdAt.localeCompare(right.createdAt), - ); + return [...derivePendingUserInputSignals(activities)]; } export function deriveActivePlanState( @@ -1156,7 +991,7 @@ function extractWorkLogRequestKind( ) { return payload.requestKind; } - return requestKindFromRequestType(payload?.requestType) ?? undefined; + return requestKindFromActivityPayload(payload) ?? undefined; } function pushChangedFile(target: string[], seen: Set, value: unknown) { @@ -1223,46 +1058,7 @@ function extractChangedFiles(payload: Record | null): string[] return changedFiles; } -function compareActivitiesByOrder( - left: OrchestrationThreadActivity, - right: OrchestrationThreadActivity, -): number { - if (left.sequence !== undefined && right.sequence !== undefined) { - if (left.sequence !== right.sequence) { - return left.sequence - right.sequence; - } - } else if (left.sequence !== undefined) { - return 1; - } else if (right.sequence !== undefined) { - return -1; - } - - const createdAtComparison = left.createdAt.localeCompare(right.createdAt); - if (createdAtComparison !== 0) { - return createdAtComparison; - } - - const lifecycleRankComparison = - compareActivityLifecycleRank(left.kind) - compareActivityLifecycleRank(right.kind); - if (lifecycleRankComparison !== 0) { - return lifecycleRankComparison; - } - - return left.id.localeCompare(right.id); -} - -function compareActivityLifecycleRank(kind: string): number { - if (kind.endsWith(".started") || kind === "tool.started") { - return 0; - } - if (kind.endsWith(".progress") || kind.endsWith(".updated")) { - return 1; - } - if (kind.endsWith(".completed") || kind.endsWith(".resolved")) { - return 2; - } - return 1; -} +const compareActivitiesByOrder = compareThreadActivitiesByOrder; export function hasToolActivityForTurn( activities: ReadonlyArray, diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index c13eae2c..302b9c5f 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -1,13 +1,19 @@ import { - DEFAULT_MODEL_BY_PROVIDER, + CheckpointRef, + EventId, + MessageId, + OrchestrationProposedPlanId, ProjectId, ThreadId, TurnId, + type OrchestrationEvent, type OrchestrationReadModel, + type OrchestrationThreadActivity, } from "@t3tools/contracts"; import { describe, expect, it, vi } from "vitest"; import { + applyOrchestrationEvents, collapseProjectsExcept, markThreadUnread, renameProjectLocally, @@ -38,6 +44,10 @@ function makeThread(overrides: Partial = {}): Thread { error: null, createdAt: "2026-02-13T00:00:00.000Z", latestTurn: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, envMode: "local", branch: null, worktreePath: null, @@ -147,6 +157,50 @@ function makeReadModelProject( }; } +function makeDomainEvent( + type: TType, + payload: Extract["payload"], + overrides: Partial, "type" | "payload">> = {}, +): Extract { + const aggregateId = "threadId" in payload ? payload.threadId : ProjectId.makeUnsafe("project-1"); + return { + type, + payload, + sequence: overrides.sequence ?? 1, + eventId: overrides.eventId ?? EventId.makeUnsafe(`event-${crypto.randomUUID()}`), + aggregateKind: overrides.aggregateKind ?? "thread", + aggregateId, + occurredAt: overrides.occurredAt ?? "2026-02-27T00:00:00.000Z", + commandId: overrides.commandId ?? null, + causationEventId: overrides.causationEventId ?? null, + correlationId: overrides.correlationId ?? null, + metadata: overrides.metadata ?? {}, + ...overrides, + } as Extract; +} + +function makeActivity(overrides: { + id?: string; + createdAt?: string; + kind?: string; + summary?: string; + tone?: OrchestrationThreadActivity["tone"]; + payload?: Record; + turnId?: string; + sequence?: number; +}): OrchestrationThreadActivity { + return { + id: EventId.makeUnsafe(overrides.id ?? crypto.randomUUID()), + createdAt: overrides.createdAt ?? "2026-02-23T00:00:00.000Z", + kind: overrides.kind ?? "tool.started", + summary: overrides.summary ?? "Tool call", + tone: overrides.tone ?? "tool", + payload: overrides.payload ?? {}, + turnId: overrides.turnId ? TurnId.makeUnsafe(overrides.turnId) : null, + ...(overrides.sequence !== undefined ? { sequence: overrides.sequence } : {}), + }; +} + describe("store pure functions", () => { it("markThreadUnread moves lastVisitedAt before completion for a completed thread", () => { const latestTurnCompletedAt = "2026-02-25T12:30:00.000Z"; @@ -663,4 +717,559 @@ describe("store read model sync", () => { expect(next.threads[0]).toBe(thread); }); + + it("uses server-computed sidebar summary signals from the read model", () => { + const next = syncServerReadModel( + { + projects: [], + threads: [], + sidebarThreadSummaryById: {}, + threadsHydrated: false, + }, + makeReadModel( + makeReadModelThread({ + latestUserMessageAt: "2026-02-27T00:05:00.000Z", + hasPendingApprovals: true, + hasPendingUserInput: true, + hasActionableProposedPlan: true, + updatedAt: "2026-02-27T00:10:00.000Z", + }), + ), + ); + + expect(next.threads[0]).toMatchObject({ + latestUserMessageAt: "2026-02-27T00:05:00.000Z", + hasPendingApprovals: true, + hasPendingUserInput: true, + hasActionableProposedPlan: true, + }); + expect(next.sidebarThreadSummaryById["thread-1"]).toMatchObject({ + latestUserMessageAt: "2026-02-27T00:05:00.000Z", + hasPendingApprovals: true, + hasPendingUserInput: true, + hasActionableProposedPlan: true, + }); + }); +}); + +describe("live orchestration event application", () => { + it("merges assistant message chunks and completes the latest turn without waiting for a snapshot", () => { + const turnId = TurnId.makeUnsafe("turn-1"); + const messageId = MessageId.makeUnsafe("message-1"); + const initialState = makeState(makeThread()); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId, + role: "assistant", + text: "Hel", + turnId, + streaming: true, + source: "native", + createdAt: "2026-02-27T00:00:01.000Z", + updatedAt: "2026-02-27T00:00:01.000Z", + }), + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId, + role: "assistant", + text: "lo", + turnId, + streaming: true, + source: "native", + createdAt: "2026-02-27T00:00:01.000Z", + updatedAt: "2026-02-27T00:00:02.000Z", + }), + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId, + role: "assistant", + text: "Hello", + turnId, + streaming: false, + source: "native", + createdAt: "2026-02-27T00:00:01.000Z", + updatedAt: "2026-02-27T00:00:03.000Z", + }), + ]); + + expect(next.threads[0]?.messages).toEqual([ + { + id: messageId, + role: "assistant", + text: "Hello", + turnId, + createdAt: "2026-02-27T00:00:01.000Z", + completedAt: "2026-02-27T00:00:03.000Z", + streaming: false, + source: "native", + }, + ]); + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "completed", + requestedAt: "2026-02-27T00:00:01.000Z", + startedAt: "2026-02-27T00:00:01.000Z", + completedAt: "2026-02-27T00:00:03.000Z", + assistantMessageId: messageId, + }); + expect(next.sidebarThreadSummaryById["thread-1"]?.latestTurn?.state).toBe("completed"); + }); + + it("updates latest user message timestamps from live user messages", () => { + const initialState = makeState(makeThread()); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId: MessageId.makeUnsafe("user-message-1"), + role: "user", + text: "Run this with Claude", + turnId: null, + streaming: false, + source: "native", + createdAt: "2026-02-27T00:05:00.000Z", + updatedAt: "2026-02-27T00:05:00.000Z", + }), + ]); + + expect(next.threads[0]?.latestUserMessageAt).toBe("2026-02-27T00:05:00.000Z"); + expect(next.sidebarThreadSummaryById["thread-1"]?.latestUserMessageAt).toBe( + "2026-02-27T00:05:00.000Z", + ); + }); + + it("updates pending approval flags from live activity events", () => { + const initialState = makeState(makeThread()); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.activity-appended", { + threadId: ThreadId.makeUnsafe("thread-1"), + activity: makeActivity({ + id: "approval-open", + createdAt: "2026-02-27T00:06:00.000Z", + kind: "approval.requested", + summary: "Command approval requested", + tone: "approval", + payload: { + requestId: "req-1", + requestKind: "command", + detail: "bun run lint", + }, + }), + }), + ]); + + expect(next.threads[0]?.hasPendingApprovals).toBe(true); + expect(next.sidebarThreadSummaryById["thread-1"]?.hasPendingApprovals).toBe(true); + }); + + it("updates latest turn and thread error immediately from session-set events", () => { + const turnId = TurnId.makeUnsafe("turn-session-1"); + const initialState = makeState(makeThread()); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.session-set", { + threadId: ThreadId.makeUnsafe("thread-1"), + session: { + threadId: ThreadId.makeUnsafe("thread-1"), + status: "running", + providerName: "claudeAgent", + runtimeMode: "approval-required", + activeTurnId: turnId, + lastError: "Turn failed", + updatedAt: "2026-02-27T00:07:00.000Z", + }, + }), + ]); + + expect(next.threads[0]?.session).toMatchObject({ + provider: "claudeAgent", + status: "running", + orchestrationStatus: "running", + activeTurnId: turnId, + }); + expect(next.threads[0]?.error).toBe("Turn failed"); + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "running", + requestedAt: "2026-02-27T00:07:00.000Z", + startedAt: "2026-02-27T00:07:00.000Z", + completedAt: null, + assistantMessageId: null, + }); + }); + + it("preserves source proposed plan across live turn-start and assistant streaming", () => { + const turnId = TurnId.makeUnsafe("turn-plan-live-1"); + const messageId = MessageId.makeUnsafe("assistant-plan-live-1"); + const sourceProposedPlan = { + threadId: ThreadId.makeUnsafe("thread-plan-source"), + planId: OrchestrationProposedPlanId.makeUnsafe("plan-source"), + }; + const initialState = makeState(makeThread()); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.turn-start-requested", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId: MessageId.makeUnsafe("user-message-plan-live-1"), + createdAt: "2026-02-27T00:07:30.000Z", + dispatchMode: "queue", + runtimeMode: "approval-required", + interactionMode: DEFAULT_INTERACTION_MODE, + sourceProposedPlan, + }), + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId, + role: "assistant", + text: "Applying plan", + turnId, + streaming: true, + source: "native", + createdAt: "2026-02-27T00:07:31.000Z", + updatedAt: "2026-02-27T00:07:31.000Z", + }), + ]); + + expect(next.threads[0]?.pendingSourceProposedPlan).toEqual(sourceProposedPlan); + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "running", + requestedAt: "2026-02-27T00:07:31.000Z", + startedAt: "2026-02-27T00:07:31.000Z", + completedAt: null, + assistantMessageId: messageId, + sourceProposedPlan, + }); + }); + + it("downgrades a stale running latest turn when session-set reports a terminal state", () => { + const turnId = TurnId.makeUnsafe("turn-session-terminal-1"); + const initialState = makeState( + makeThread({ + latestTurn: { + turnId, + state: "running", + requestedAt: "2026-02-27T00:07:50.000Z", + startedAt: "2026-02-27T00:07:51.000Z", + completedAt: null, + assistantMessageId: null, + }, + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.session-set", { + threadId: ThreadId.makeUnsafe("thread-1"), + session: { + threadId: ThreadId.makeUnsafe("thread-1"), + status: "error", + providerName: "claudeAgent", + runtimeMode: "approval-required", + activeTurnId: null, + lastError: "provider crashed", + updatedAt: "2026-02-27T00:07:55.000Z", + }, + }), + ]); + + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "error", + requestedAt: "2026-02-27T00:07:50.000Z", + startedAt: "2026-02-27T00:07:51.000Z", + completedAt: "2026-02-27T00:07:55.000Z", + assistantMessageId: null, + }); + expect(next.sidebarThreadSummaryById["thread-1"]?.latestTurn?.state).toBe("error"); + }); + + it("marks a running turn interrupted as soon as interrupt is requested", () => { + const turnId = TurnId.makeUnsafe("turn-stop-1"); + const initialState = makeState( + makeThread({ + latestTurn: { + turnId, + state: "running", + requestedAt: "2026-02-27T00:08:00.000Z", + startedAt: "2026-02-27T00:08:01.000Z", + completedAt: null, + assistantMessageId: null, + }, + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.turn-interrupt-requested", { + threadId: ThreadId.makeUnsafe("thread-1"), + turnId, + createdAt: "2026-02-27T00:08:05.000Z", + }), + ]); + + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "interrupted", + requestedAt: "2026-02-27T00:08:00.000Z", + startedAt: "2026-02-27T00:08:01.000Z", + completedAt: "2026-02-27T00:08:05.000Z", + assistantMessageId: null, + }); + }); + + it("rebinds existing turn diff summaries to the assistant message as soon as it streams in", () => { + const turnId = TurnId.makeUnsafe("turn-diff-1"); + const messageId = MessageId.makeUnsafe("assistant-message-1"); + const initialState = makeState( + makeThread({ + turnDiffSummaries: [ + { + turnId, + completedAt: "2026-02-27T00:09:00.000Z", + files: [], + checkpointRef: CheckpointRef.makeUnsafe("checkpoint-1"), + checkpointTurnCount: 1, + status: "completed", + }, + ], + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.message-sent", { + threadId: ThreadId.makeUnsafe("thread-1"), + messageId, + role: "assistant", + text: "Done", + turnId, + streaming: false, + source: "native", + createdAt: "2026-02-27T00:09:01.000Z", + updatedAt: "2026-02-27T00:09:01.000Z", + }), + ]); + + expect(next.threads[0]?.turnDiffSummaries[0]?.assistantMessageId).toBe(messageId); + }); + + it("updates latest turn state from a completed turn diff before snapshot reconciliation", () => { + const turnId = TurnId.makeUnsafe("turn-diff-state-1"); + const sourceProposedPlan = { + threadId: ThreadId.makeUnsafe("thread-plan-source"), + planId: OrchestrationProposedPlanId.makeUnsafe("plan-source"), + }; + const initialState = makeState( + makeThread({ + pendingSourceProposedPlan: sourceProposedPlan, + latestTurn: { + turnId, + state: "running", + requestedAt: "2026-02-27T00:09:30.000Z", + startedAt: "2026-02-27T00:09:31.000Z", + completedAt: null, + assistantMessageId: null, + }, + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.turn-diff-completed", { + threadId: ThreadId.makeUnsafe("thread-1"), + turnId, + checkpointTurnCount: 1, + checkpointRef: CheckpointRef.makeUnsafe("checkpoint-1"), + completedAt: "2026-02-27T00:09:35.000Z", + files: [], + status: "ready", + assistantMessageId: MessageId.makeUnsafe("assistant-diff-state-1"), + }), + ]); + + expect(next.threads[0]?.latestTurn).toEqual({ + turnId, + state: "completed", + requestedAt: "2026-02-27T00:09:30.000Z", + startedAt: "2026-02-27T00:09:31.000Z", + completedAt: "2026-02-27T00:09:35.000Z", + assistantMessageId: MessageId.makeUnsafe("assistant-diff-state-1"), + sourceProposedPlan, + }); + expect(next.sidebarThreadSummaryById["thread-1"]?.latestTurn).toEqual({ + turnId, + state: "completed", + requestedAt: "2026-02-27T00:09:30.000Z", + startedAt: "2026-02-27T00:09:31.000Z", + completedAt: "2026-02-27T00:09:35.000Z", + assistantMessageId: MessageId.makeUnsafe("assistant-diff-state-1"), + sourceProposedPlan, + }); + }); + + it("closes the local session immediately when session-stop-requested arrives", () => { + const initialState = makeState( + makeThread({ + session: { + provider: "codex", + status: "running", + orchestrationStatus: "running", + activeTurnId: TurnId.makeUnsafe("turn-stop-2"), + createdAt: "2026-02-27T00:10:00.000Z", + updatedAt: "2026-02-27T00:10:00.000Z", + }, + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.session-stop-requested", { + threadId: ThreadId.makeUnsafe("thread-1"), + createdAt: "2026-02-27T00:10:05.000Z", + }), + ]); + + expect(next.threads[0]?.session).toMatchObject({ + status: "closed", + orchestrationStatus: "stopped", + activeTurnId: undefined, + updatedAt: "2026-02-27T00:10:05.000Z", + }); + }); + + it("prunes optimistic turn state immediately when a thread is reverted", () => { + const turnId1 = TurnId.makeUnsafe("turn-keep"); + const turnId2 = TurnId.makeUnsafe("turn-drop"); + const initialState = makeState( + makeThread({ + messages: [ + { + id: MessageId.makeUnsafe("user-keep"), + role: "user", + text: "Keep this", + turnId: turnId1, + createdAt: "2026-02-27T00:11:00.000Z", + completedAt: "2026-02-27T00:11:00.000Z", + streaming: false, + }, + { + id: MessageId.makeUnsafe("assistant-keep"), + role: "assistant", + text: "Kept answer", + turnId: turnId1, + createdAt: "2026-02-27T00:11:01.000Z", + completedAt: "2026-02-27T00:11:01.000Z", + streaming: false, + }, + { + id: MessageId.makeUnsafe("user-drop"), + role: "user", + text: "Drop this", + turnId: turnId2, + createdAt: "2026-02-27T00:12:00.000Z", + completedAt: "2026-02-27T00:12:00.000Z", + streaming: false, + }, + { + id: MessageId.makeUnsafe("assistant-drop"), + role: "assistant", + text: "Dropped answer", + turnId: turnId2, + createdAt: "2026-02-27T00:12:01.000Z", + completedAt: "2026-02-27T00:12:01.000Z", + streaming: false, + }, + ], + proposedPlans: [ + { + id: OrchestrationProposedPlanId.makeUnsafe("plan-keep"), + turnId: turnId1, + planMarkdown: "Keep plan", + implementedAt: null, + implementationThreadId: null, + createdAt: "2026-02-27T00:11:00.000Z", + updatedAt: "2026-02-27T00:11:00.000Z", + }, + { + id: OrchestrationProposedPlanId.makeUnsafe("plan-drop"), + turnId: turnId2, + planMarkdown: "Drop plan", + implementedAt: null, + implementationThreadId: null, + createdAt: "2026-02-27T00:12:00.000Z", + updatedAt: "2026-02-27T00:12:00.000Z", + }, + ], + activities: [ + makeActivity({ + id: "activity-keep", + createdAt: "2026-02-27T00:11:01.500Z", + turnId: "turn-keep", + }), + makeActivity({ + id: "activity-drop", + createdAt: "2026-02-27T00:12:01.500Z", + turnId: "turn-drop", + }), + ], + turnDiffSummaries: [ + { + turnId: turnId1, + completedAt: "2026-02-27T00:11:02.000Z", + files: [], + checkpointTurnCount: 1, + assistantMessageId: MessageId.makeUnsafe("assistant-keep"), + status: "completed", + }, + { + turnId: turnId2, + completedAt: "2026-02-27T00:12:02.000Z", + files: [], + checkpointTurnCount: 2, + assistantMessageId: MessageId.makeUnsafe("assistant-drop"), + status: "completed", + }, + ], + pendingSourceProposedPlan: { + threadId: ThreadId.makeUnsafe("thread-plan-source"), + planId: OrchestrationProposedPlanId.makeUnsafe("plan-source"), + }, + latestTurn: { + turnId: turnId2, + state: "running", + requestedAt: "2026-02-27T00:12:00.000Z", + startedAt: "2026-02-27T00:12:01.000Z", + completedAt: null, + assistantMessageId: MessageId.makeUnsafe("assistant-drop"), + }, + }), + ); + + const next = applyOrchestrationEvents(initialState, [ + makeDomainEvent("thread.reverted", { + threadId: ThreadId.makeUnsafe("thread-1"), + turnCount: 1, + }), + ]); + + expect(next.threads[0]?.pendingSourceProposedPlan).toBeUndefined(); + expect(next.threads[0]?.messages.map((message) => message.id)).toEqual([ + MessageId.makeUnsafe("user-keep"), + MessageId.makeUnsafe("assistant-keep"), + ]); + expect(next.threads[0]?.proposedPlans.map((plan) => plan.id)).toEqual([ + OrchestrationProposedPlanId.makeUnsafe("plan-keep"), + ]); + expect(next.threads[0]?.activities.map((activity) => activity.id)).toEqual([ + EventId.makeUnsafe("activity-keep"), + ]); + expect(next.threads[0]?.turnDiffSummaries.map((summary) => summary.turnId)).toEqual([turnId1]); + expect(next.threads[0]?.latestTurn).toEqual({ + turnId: turnId1, + state: "completed", + requestedAt: "2026-02-27T00:11:02.000Z", + startedAt: "2026-02-27T00:11:02.000Z", + completedAt: "2026-02-27T00:11:02.000Z", + assistantMessageId: MessageId.makeUnsafe("assistant-keep"), + }); + }); }); diff --git a/apps/web/src/store.ts b/apps/web/src/store.ts index fe56bcb0..62c44b10 100644 --- a/apps/web/src/store.ts +++ b/apps/web/src/store.ts @@ -4,6 +4,7 @@ import { Fragment, type ReactNode, createElement, useEffect } from "react"; import { + type OrchestrationEvent, type ProviderKind, ThreadId, type OrchestrationReadModel, @@ -21,13 +22,13 @@ import { type ThreadWorkspacePatch, } from "./types"; import { Debouncer } from "@tanstack/react-pacer"; +import { hasLiveTurnTailWork } from "./session-logic"; import { - derivePendingApprovals, - derivePendingUserInputs, - findLatestProposedPlan, - hasActionableProposedPlan, - hasLiveTurnTailWork, -} from "./session-logic"; + deriveLatestUserMessageAt, + hasActionableProposedPlanSignal, + hasPendingApprovalsSignal, + hasPendingUserInputSignal, +} from "@t3tools/shared/threadSignals"; // ── State ──────────────────────────────────────────────────────────── @@ -41,6 +42,7 @@ export interface AppState { type ReadModelProject = OrchestrationReadModel["projects"][number]; type ReadModelThread = OrchestrationReadModel["threads"][number]; type ReadModelMessage = OrchestrationReadModel["threads"][number]["messages"][number]; +type ThreadMessageSentEvent = Extract; const PERSISTED_STATE_KEY = "t3code:renderer-state:v8"; const LEGACY_PERSISTED_STATE_KEYS = [ @@ -54,6 +56,7 @@ const LEGACY_PERSISTED_STATE_KEYS = [ "codething:renderer-state:v2", "codething:renderer-state:v1", ] as const; +const MAX_THREAD_MESSAGES = 2_000; const initialState: AppState = { projects: [], @@ -516,6 +519,10 @@ function isNonFatalThreadErrorMessage(message: string | null | undefined): boole return normalized.includes("write_stdin failed: stdin is closed for this session"); } +function normalizeThreadErrorMessage(message: string | null | undefined): string | null { + return message && !isNonFatalThreadErrorMessage(message) ? message : null; +} + function normalizeThreadSession( incoming: ReadModelThread["session"], previous: Thread["session"] | undefined | null, @@ -613,6 +620,7 @@ function normalizeThreadFromReadModel( ? incoming.session.lastError : null; const lastVisitedAt = previous?.lastVisitedAt ?? incoming.updatedAt; + const pendingSourceProposedPlan = latestTurn?.sourceProposedPlan; if ( previous && @@ -628,6 +636,11 @@ function normalizeThreadFromReadModel( previous.createdAt === incoming.createdAt && previous.updatedAt === incoming.updatedAt && previous.latestTurn === latestTurn && + previous.pendingSourceProposedPlan === pendingSourceProposedPlan && + previous.latestUserMessageAt === incoming.latestUserMessageAt && + previous.hasPendingApprovals === incoming.hasPendingApprovals && + previous.hasPendingUserInput === incoming.hasPendingUserInput && + previous.hasActionableProposedPlan === incoming.hasActionableProposedPlan && previous.lastVisitedAt === lastVisitedAt && (previous.parentThreadId ?? null) === (incoming.parentThreadId ?? null) && (previous.subagentAgentId ?? null) === (incoming.subagentAgentId ?? null) && @@ -662,6 +675,11 @@ function normalizeThreadFromReadModel( createdAt: incoming.createdAt, updatedAt: incoming.updatedAt, latestTurn, + ...(pendingSourceProposedPlan ? { pendingSourceProposedPlan } : {}), + latestUserMessageAt: incoming.latestUserMessageAt ?? null, + hasPendingApprovals: incoming.hasPendingApprovals ?? false, + hasPendingUserInput: incoming.hasPendingUserInput ?? false, + hasActionableProposedPlan: incoming.hasActionableProposedPlan ?? false, lastVisitedAt, parentThreadId: incoming.parentThreadId ?? null, subagentAgentId: incoming.subagentAgentId ?? null, @@ -783,17 +801,97 @@ function attachmentPreviewRoutePath(attachmentId: string): string { return `/attachments/${encodeURIComponent(attachmentId)}`; } -function getLatestUserMessageAt(messages: ReadonlyArray): string | null { - let latestUserMessageAt: string | null = null; - for (const message of messages) { - if (message.role !== "user") { - continue; - } - if (latestUserMessageAt === null || message.createdAt > latestUserMessageAt) { - latestUserMessageAt = message.createdAt; - } +function checkpointStatusToLatestTurnState( + status: Thread["turnDiffSummaries"][number]["status"], +): NonNullable["state"] { + if (status === "error") { + return "error"; } - return latestUserMessageAt; + if (status === "missing") { + return "interrupted"; + } + return "completed"; +} + +// Preserve the proposed-plan source across the live turn lifecycle until the read model catches up. +function buildLatestTurn(params: { + previous: Thread["latestTurn"]; + turnId: NonNullable["turnId"]; + state: NonNullable["state"]; + requestedAt: string; + startedAt: string | null; + completedAt: string | null; + assistantMessageId: NonNullable["assistantMessageId"]; + sourceProposedPlan?: Thread["pendingSourceProposedPlan"]; +}): NonNullable { + const sourceProposedPlan = + params.previous?.turnId === params.turnId + ? (params.previous.sourceProposedPlan ?? params.sourceProposedPlan) + : params.sourceProposedPlan; + return { + turnId: params.turnId, + state: params.state, + requestedAt: params.requestedAt, + startedAt: params.startedAt, + completedAt: params.completedAt, + assistantMessageId: params.assistantMessageId, + ...(sourceProposedPlan ? { sourceProposedPlan } : {}), + }; +} + +function reconcileLatestTurnFromSession( + thread: Thread, + session: NonNullable, + error: string | null, +): Thread["latestTurn"] { + if (session.status === "running" && session.activeTurnId !== null) { + return buildLatestTurn({ + previous: thread.latestTurn, + turnId: session.activeTurnId, + state: "running", + requestedAt: + thread.latestTurn?.turnId === session.activeTurnId + ? thread.latestTurn.requestedAt + : session.updatedAt, + startedAt: + thread.latestTurn?.turnId === session.activeTurnId + ? (thread.latestTurn.startedAt ?? session.updatedAt) + : session.updatedAt, + completedAt: null, + assistantMessageId: + thread.latestTurn?.turnId === session.activeTurnId + ? thread.latestTurn.assistantMessageId + : null, + sourceProposedPlan: thread.pendingSourceProposedPlan, + }); + } + + if ( + thread.latestTurn === null || + thread.latestTurn.state !== "running" || + thread.latestTurn.completedAt !== null + ) { + return thread.latestTurn; + } + + if ( + session.status !== "error" && + session.status !== "interrupted" && + session.status !== "idle" && + session.status !== "stopped" + ) { + return thread.latestTurn; + } + + return buildLatestTurn({ + previous: thread.latestTurn, + turnId: thread.latestTurn.turnId, + state: session.status === "error" || error ? "error" : "interrupted", + requestedAt: thread.latestTurn.requestedAt, + startedAt: thread.latestTurn.startedAt ?? session.updatedAt, + completedAt: thread.latestTurn.completedAt ?? session.updatedAt, + assistantMessageId: thread.latestTurn.assistantMessageId, + }); } function sidebarThreadSummariesEqual( @@ -851,12 +949,12 @@ function buildSidebarThreadSummary( subagentAgentId: thread.subagentAgentId ?? null, subagentNickname: thread.subagentNickname ?? null, subagentRole: thread.subagentRole ?? null, - latestUserMessageAt: getLatestUserMessageAt(thread.messages), - hasPendingApprovals: derivePendingApprovals(thread.activities).length > 0, - hasPendingUserInput: derivePendingUserInputs(thread.activities).length > 0, - hasActionableProposedPlan: hasActionableProposedPlan( - findLatestProposedPlan(thread.proposedPlans, thread.latestTurn?.turnId ?? null), - ), + latestUserMessageAt: thread.latestUserMessageAt ?? deriveLatestUserMessageAt(thread.messages), + hasPendingApprovals: thread.hasPendingApprovals ?? hasPendingApprovalsSignal(thread.activities), + hasPendingUserInput: thread.hasPendingUserInput ?? hasPendingUserInputSignal(thread.activities), + hasActionableProposedPlan: + thread.hasActionableProposedPlan ?? + hasActionableProposedPlanSignal(thread.proposedPlans, thread.latestTurn), hasLiveTailWork: hasLiveTurnTailWork({ latestTurn: thread.latestTurn, messages: thread.messages, @@ -907,6 +1005,626 @@ function sortTurnDiffSummaries( ); } +function rebindTurnDiffSummariesForAssistantMessage( + turnDiffSummaries: ReadonlyArray, + turnId: Thread["turnDiffSummaries"][number]["turnId"], + assistantMessageId: NonNullable["assistantMessageId"], +): Thread["turnDiffSummaries"] { + let changed = false; + const nextSummaries = turnDiffSummaries.map((summary) => { + if (summary.turnId !== turnId || summary.assistantMessageId === assistantMessageId) { + return summary; + } + changed = true; + return { + ...summary, + assistantMessageId: assistantMessageId ?? undefined, + }; + }); + return changed ? nextSummaries : [...turnDiffSummaries]; +} + +function retainThreadMessagesAfterRevert( + messages: ReadonlyArray, + retainedTurnIds: ReadonlySet, + turnCount: number, +): ChatMessage[] { + const retainedMessageIds = new Set(); + for (const message of messages) { + if (message.role === "system") { + retainedMessageIds.add(message.id); + continue; + } + if ( + message.turnId !== undefined && + message.turnId !== null && + retainedTurnIds.has(message.turnId) + ) { + retainedMessageIds.add(message.id); + } + } + + const retainedUserCount = messages.filter( + (message) => message.role === "user" && retainedMessageIds.has(message.id), + ).length; + const missingUserCount = Math.max(0, turnCount - retainedUserCount); + if (missingUserCount > 0) { + const fallbackUserMessages = messages + .filter( + (message) => + message.role === "user" && + !retainedMessageIds.has(message.id) && + (message.turnId === undefined || + message.turnId === null || + retainedTurnIds.has(message.turnId)), + ) + .toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), + ) + .slice(0, missingUserCount); + for (const message of fallbackUserMessages) { + retainedMessageIds.add(message.id); + } + } + + const retainedAssistantCount = messages.filter( + (message) => message.role === "assistant" && retainedMessageIds.has(message.id), + ).length; + const missingAssistantCount = Math.max(0, turnCount - retainedAssistantCount); + if (missingAssistantCount > 0) { + const fallbackAssistantMessages = messages + .filter( + (message) => + message.role === "assistant" && + !retainedMessageIds.has(message.id) && + (message.turnId === undefined || + message.turnId === null || + retainedTurnIds.has(message.turnId)), + ) + .toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), + ) + .slice(0, missingAssistantCount); + for (const message of fallbackAssistantMessages) { + retainedMessageIds.add(message.id); + } + } + + return messages.filter((message) => retainedMessageIds.has(message.id)); +} + +function retainThreadActivitiesAfterRevert( + activities: ReadonlyArray, + retainedTurnIds: ReadonlySet, +): Thread["activities"] { + return activities.filter( + (activity) => activity.turnId === null || retainedTurnIds.has(activity.turnId), + ); +} + +function retainThreadProposedPlansAfterRevert( + proposedPlans: ReadonlyArray, + retainedTurnIds: ReadonlySet, +): Thread["proposedPlans"] { + return proposedPlans.filter( + (proposedPlan) => proposedPlan.turnId === null || retainedTurnIds.has(proposedPlan.turnId), + ); +} + +function applyTurnDiffSummaryToThread( + thread: Thread, + summary: Thread["turnDiffSummaries"][number], +): Thread { + const previousSummary = thread.turnDiffSummaries.find( + (existingSummary) => existingSummary.turnId === summary.turnId, + ); + const nextSummary = normalizeSingleTurnDiffSummary(summary, previousSummary); + if (previousSummary && previousSummary.status !== "missing" && nextSummary.status === "missing") { + return thread; + } + const turnDiffSummaries = previousSummary + ? thread.turnDiffSummaries.map((existingSummary) => + existingSummary.turnId === nextSummary.turnId ? nextSummary : existingSummary, + ) + : sortTurnDiffSummaries([...thread.turnDiffSummaries, nextSummary]); + + const latestTurn = + thread.latestTurn === null || thread.latestTurn.turnId === nextSummary.turnId + ? buildLatestTurn({ + previous: thread.latestTurn, + turnId: nextSummary.turnId, + state: checkpointStatusToLatestTurnState(nextSummary.status), + requestedAt: thread.latestTurn?.requestedAt ?? nextSummary.completedAt, + startedAt: thread.latestTurn?.startedAt ?? nextSummary.completedAt, + completedAt: nextSummary.completedAt, + assistantMessageId: nextSummary.assistantMessageId ?? null, + sourceProposedPlan: thread.pendingSourceProposedPlan, + }) + : thread.latestTurn; + + if ( + previousSummary === nextSummary && + turnDiffSummaries === thread.turnDiffSummaries && + latestTurn === thread.latestTurn && + (thread.updatedAt ?? thread.createdAt) >= nextSummary.completedAt + ) { + return thread; + } + + return { + ...thread, + turnDiffSummaries: + arraysShallowEqual(thread.turnDiffSummaries, turnDiffSummaries) && + thread.turnDiffSummaries.length === turnDiffSummaries.length + ? thread.turnDiffSummaries + : turnDiffSummaries, + latestTurn, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > nextSummary.completedAt + ? thread.updatedAt + : nextSummary.completedAt, + }; +} + +function deriveThreadStateSignals( + thread: Thread, +): Pick< + Thread, + | "latestUserMessageAt" + | "hasPendingApprovals" + | "hasPendingUserInput" + | "hasActionableProposedPlan" +> { + return { + latestUserMessageAt: deriveLatestUserMessageAt(thread.messages), + hasPendingApprovals: hasPendingApprovalsSignal(thread.activities), + hasPendingUserInput: hasPendingUserInputSignal(thread.activities), + hasActionableProposedPlan: hasActionableProposedPlanSignal( + thread.proposedPlans, + thread.latestTurn, + ), + }; +} + +function withDerivedThreadStateSignals(thread: Thread): Thread { + const nextSignals = deriveThreadStateSignals(thread); + if ( + thread.latestUserMessageAt === nextSignals.latestUserMessageAt && + thread.hasPendingApprovals === nextSignals.hasPendingApprovals && + thread.hasPendingUserInput === nextSignals.hasPendingUserInput && + thread.hasActionableProposedPlan === nextSignals.hasActionableProposedPlan + ) { + return thread; + } + return { + ...thread, + ...nextSignals, + }; +} + +function applyThreadUpdate( + state: AppState, + threadId: ThreadId, + updater: (thread: Thread) => Thread, +): AppState { + let nextThread: Thread | null = null; + const threads = updateThread(state.threads, threadId, (thread) => { + const updatedThread = withDerivedThreadStateSignals(updater(thread)); + if (updatedThread !== thread) { + nextThread = updatedThread; + } + return updatedThread; + }); + if (threads === state.threads) { + return state; + } + if (nextThread === null) { + return { + ...state, + threads, + }; + } + const previousSummary = state.sidebarThreadSummaryById[threadId]; + const nextSummary = buildSidebarThreadSummary(nextThread, previousSummary); + if (nextSummary === previousSummary) { + return { + ...state, + threads, + }; + } + return { + ...state, + threads, + sidebarThreadSummaryById: { + ...state.sidebarThreadSummaryById, + [threadId]: nextSummary, + }, + }; +} + +function mergeStreamingMessage( + existingMessage: ChatMessage, + incomingMessage: ChatMessage, +): ChatMessage | null { + const nextText = + incomingMessage.streaming || incomingMessage.text.length === 0 + ? `${existingMessage.text}${incomingMessage.text}` + : incomingMessage.text; + const nextAttachments = incomingMessage.attachments ?? existingMessage.attachments; + const nextCompletedAt = incomingMessage.streaming + ? existingMessage.completedAt + : (incomingMessage.completedAt ?? existingMessage.completedAt); + const nextTurnId = + incomingMessage.turnId !== undefined ? incomingMessage.turnId : existingMessage.turnId; + const nextSource = incomingMessage.source ?? existingMessage.source; + + if ( + existingMessage.text === nextText && + existingMessage.streaming === incomingMessage.streaming && + existingMessage.attachments === nextAttachments && + existingMessage.completedAt === nextCompletedAt && + existingMessage.turnId === nextTurnId && + existingMessage.source === nextSource + ) { + return null; + } + + return { + ...existingMessage, + text: nextText, + streaming: incomingMessage.streaming, + ...(nextAttachments ? { attachments: nextAttachments } : {}), + ...(nextTurnId !== undefined ? { turnId: nextTurnId } : {}), + ...(nextSource !== undefined ? { source: nextSource } : {}), + ...(nextCompletedAt !== undefined ? { completedAt: nextCompletedAt } : {}), + }; +} + +function applyThreadMessageSentEvent(thread: Thread, event: ThreadMessageSentEvent): Thread { + const payload = event.payload; + const incomingMessage = normalizeChatMessage( + { + id: payload.messageId, + role: payload.role, + text: payload.text, + turnId: payload.turnId, + attachments: payload.attachments ?? [], + streaming: payload.streaming, + source: payload.source, + createdAt: payload.createdAt, + updatedAt: payload.updatedAt, + }, + thread.messages.find((message) => message.id === payload.messageId), + ); + const existingIndex = thread.messages.findIndex((message) => message.id === payload.messageId); + let messages = thread.messages; + + if (existingIndex >= 0) { + const existingMessage = thread.messages[existingIndex]; + if (!existingMessage) { + return thread; + } + const mergedMessage = mergeStreamingMessage(existingMessage, incomingMessage); + if (mergedMessage !== null) { + messages = thread.messages.map((message, index) => + index === existingIndex ? mergedMessage : message, + ); + } + } else { + messages = [...thread.messages, incomingMessage].slice(-MAX_THREAD_MESSAGES); + } + const turnDiffSummaries = + payload.role === "assistant" && payload.turnId !== null + ? rebindTurnDiffSummariesForAssistantMessage( + thread.turnDiffSummaries, + payload.turnId, + payload.messageId, + ) + : thread.turnDiffSummaries; + + let latestTurn = thread.latestTurn; + if ( + payload.role === "assistant" && + payload.turnId !== null && + (thread.latestTurn === null || thread.latestTurn.turnId === payload.turnId) + ) { + const previousTurn = thread.latestTurn; + const nextRequestedAt = previousTurn?.requestedAt ?? payload.createdAt; + const nextStartedAt = previousTurn?.startedAt ?? payload.createdAt; + const nextCompletedAt = payload.streaming + ? (previousTurn?.completedAt ?? null) + : payload.updatedAt; + const nextState = payload.streaming + ? "running" + : previousTurn?.state === "interrupted" + ? "interrupted" + : previousTurn?.state === "error" + ? "error" + : "completed"; + const nextLatestTurn = buildLatestTurn({ + previous: previousTurn, + turnId: payload.turnId, + state: nextState, + requestedAt: nextRequestedAt, + startedAt: nextStartedAt, + completedAt: nextCompletedAt, + assistantMessageId: payload.messageId, + sourceProposedPlan: thread.pendingSourceProposedPlan, + }); + latestTurn = + previousTurn && + previousTurn.turnId === nextLatestTurn.turnId && + previousTurn.state === nextLatestTurn.state && + previousTurn.requestedAt === nextLatestTurn.requestedAt && + previousTurn.startedAt === nextLatestTurn.startedAt && + previousTurn.completedAt === nextLatestTurn.completedAt && + previousTurn.assistantMessageId === nextLatestTurn.assistantMessageId && + previousTurn.sourceProposedPlan === nextLatestTurn.sourceProposedPlan + ? previousTurn + : nextLatestTurn; + } + + const updatedAt = + thread.updatedAt && thread.updatedAt > payload.updatedAt ? thread.updatedAt : payload.updatedAt; + if ( + messages === thread.messages && + turnDiffSummaries === thread.turnDiffSummaries && + latestTurn === thread.latestTurn && + updatedAt === thread.updatedAt + ) { + return thread; + } + + return { + ...thread, + messages, + turnDiffSummaries, + latestTurn, + updatedAt, + }; +} + +function applyOrchestrationEvent(state: AppState, event: OrchestrationEvent): AppState { + switch (event.type) { + case "thread.message-sent": + return applyThreadUpdate(state, event.payload.threadId, (thread) => + applyThreadMessageSentEvent(thread, event), + ); + + case "thread.session-set": + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const session = normalizeThreadSession(event.payload.session, thread.session); + const error = normalizeThreadErrorMessage(event.payload.session.lastError); + const latestTurn = reconcileLatestTurnFromSession(thread, event.payload.session, error); + if ( + session === thread.session && + error === thread.error && + latestTurn === thread.latestTurn + ) { + return thread; + } + return { + ...thread, + session, + error, + latestTurn, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.occurredAt + ? thread.updatedAt + : event.occurredAt, + }; + }); + + case "thread.turn-interrupt-requested": { + if (event.payload.turnId === undefined) { + return state; + } + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const latestTurn = thread.latestTurn; + if (latestTurn === null || latestTurn.turnId !== event.payload.turnId) { + return thread; + } + return { + ...thread, + latestTurn: { + ...latestTurn, + state: "interrupted", + startedAt: latestTurn.startedAt ?? event.payload.createdAt, + completedAt: latestTurn.completedAt ?? event.payload.createdAt, + }, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.occurredAt + ? thread.updatedAt + : event.occurredAt, + }; + }); + } + + case "thread.session-stop-requested": + return applyThreadUpdate(state, event.payload.threadId, (thread) => + thread.session === null + ? thread + : { + ...thread, + session: { + ...thread.session, + status: "closed", + orchestrationStatus: "stopped", + activeTurnId: undefined, + updatedAt: event.payload.createdAt, + }, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.occurredAt + ? thread.updatedAt + : event.occurredAt, + }, + ); + + case "thread.turn-start-requested": + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const modelSelection = + event.payload.modelSelection !== undefined + ? normalizeModelSelection(event.payload.modelSelection, thread.modelSelection) + : thread.modelSelection; + if ( + modelSelection === thread.modelSelection && + thread.runtimeMode === event.payload.runtimeMode && + thread.interactionMode === event.payload.interactionMode && + thread.pendingSourceProposedPlan === event.payload.sourceProposedPlan && + (thread.updatedAt ?? thread.createdAt) >= event.payload.createdAt + ) { + return thread; + } + return { + ...thread, + modelSelection, + runtimeMode: event.payload.runtimeMode, + interactionMode: event.payload.interactionMode, + pendingSourceProposedPlan: event.payload.sourceProposedPlan, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.payload.createdAt + ? thread.updatedAt + : event.payload.createdAt, + }; + }); + + case "thread.activity-appended": + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const nextActivities = normalizeActivities( + [...thread.activities, event.payload.activity], + thread.activities, + ); + if (nextActivities === thread.activities) { + return thread; + } + return { + ...thread, + activities: nextActivities, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.payload.activity.createdAt + ? thread.updatedAt + : event.payload.activity.createdAt, + }; + }); + + case "thread.proposed-plan-upserted": + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const previousPlanIndex = thread.proposedPlans.findIndex( + (plan) => plan.id === event.payload.proposedPlan.id, + ); + const nextPlan = normalizeProposedPlans( + [event.payload.proposedPlan], + previousPlanIndex >= 0 ? [thread.proposedPlans[previousPlanIndex]!] : undefined, + )[0]; + if (!nextPlan) { + return thread; + } + const proposedPlans = + previousPlanIndex >= 0 + ? thread.proposedPlans.map((plan, index) => + index === previousPlanIndex ? nextPlan : plan, + ) + : [...thread.proposedPlans, nextPlan]; + if (arraysShallowEqual(thread.proposedPlans, proposedPlans)) { + return thread; + } + return { + ...thread, + proposedPlans, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.payload.proposedPlan.updatedAt + ? thread.updatedAt + : event.payload.proposedPlan.updatedAt, + }; + }); + + case "thread.turn-diff-completed": + return applyThreadUpdate(state, event.payload.threadId, (thread) => + applyTurnDiffSummaryToThread(thread, { + turnId: event.payload.turnId, + completedAt: event.payload.completedAt, + status: event.payload.status, + files: event.payload.files.map((file) => ({ + path: file.path, + ...(file.kind !== undefined ? { kind: file.kind } : {}), + ...(file.additions !== undefined ? { additions: file.additions } : {}), + ...(file.deletions !== undefined ? { deletions: file.deletions } : {}), + })), + checkpointRef: event.payload.checkpointRef, + assistantMessageId: event.payload.assistantMessageId ?? undefined, + checkpointTurnCount: event.payload.checkpointTurnCount, + }), + ); + + case "thread.reverted": + return applyThreadUpdate(state, event.payload.threadId, (thread) => { + const turnDiffSummaries = thread.turnDiffSummaries + .filter( + (entry) => + entry.checkpointTurnCount !== undefined && + entry.checkpointTurnCount <= event.payload.turnCount, + ) + .toSorted( + (left, right) => + (left.checkpointTurnCount ?? Number.MAX_SAFE_INTEGER) - + (right.checkpointTurnCount ?? Number.MAX_SAFE_INTEGER), + ); + const retainedTurnIds = new Set(turnDiffSummaries.map((entry) => entry.turnId)); + const messages = retainThreadMessagesAfterRevert( + thread.messages, + retainedTurnIds, + event.payload.turnCount, + ).slice(-MAX_THREAD_MESSAGES); + const proposedPlans = retainThreadProposedPlansAfterRevert( + thread.proposedPlans, + retainedTurnIds, + ); + const activities = retainThreadActivitiesAfterRevert(thread.activities, retainedTurnIds); + const latestCheckpoint = turnDiffSummaries.at(-1) ?? null; + + return { + ...thread, + turnDiffSummaries, + messages, + proposedPlans, + activities, + pendingSourceProposedPlan: undefined, + latestTurn: + latestCheckpoint === null + ? null + : { + turnId: latestCheckpoint.turnId, + state: checkpointStatusToLatestTurnState(latestCheckpoint.status), + requestedAt: latestCheckpoint.completedAt, + startedAt: latestCheckpoint.completedAt, + completedAt: latestCheckpoint.completedAt, + assistantMessageId: latestCheckpoint.assistantMessageId ?? null, + }, + updatedAt: + (thread.updatedAt ?? thread.createdAt) > event.occurredAt + ? thread.updatedAt + : event.occurredAt, + }; + }); + + default: + return state; + } +} + +export function applyOrchestrationEvents( + state: AppState, + events: ReadonlyArray, +): AppState { + let nextState = state; + for (const event of events) { + nextState = applyOrchestrationEvent(nextState, event); + } + return nextState; +} + // ── Pure state transition functions ──────────────────────────────────── export function syncServerReadModel(state: AppState, readModel: OrchestrationReadModel): AppState { @@ -957,49 +1675,6 @@ export function syncServerReadModel(state: AppState, readModel: OrchestrationRea }; } -export function applyThreadTurnDiffCompleted( - state: AppState, - input: { - readonly threadId: ThreadId; - readonly summary: Thread["turnDiffSummaries"][number]; - }, -): AppState { - const threads = updateThread(state.threads, input.threadId, (thread) => { - const previousSummary = thread.turnDiffSummaries.find( - (summary) => summary.turnId === input.summary.turnId, - ); - const nextSummary = normalizeSingleTurnDiffSummary(input.summary, previousSummary); - const turnDiffSummaries = previousSummary - ? thread.turnDiffSummaries.map((summary) => - summary.turnId === nextSummary.turnId ? nextSummary : summary, - ) - : sortTurnDiffSummaries([...thread.turnDiffSummaries, nextSummary]); - - if ( - previousSummary === nextSummary && - turnDiffSummaries === thread.turnDiffSummaries && - (thread.updatedAt ?? thread.createdAt) >= nextSummary.completedAt - ) { - return thread; - } - - return { - ...thread, - turnDiffSummaries: - arraysShallowEqual(thread.turnDiffSummaries, turnDiffSummaries) && - thread.turnDiffSummaries.length === turnDiffSummaries.length - ? thread.turnDiffSummaries - : turnDiffSummaries, - updatedAt: - (thread.updatedAt ?? thread.createdAt) > nextSummary.completedAt - ? thread.updatedAt - : nextSummary.completedAt, - }; - }); - - return threads === state.threads ? state : { ...state, threads }; -} - export function markThreadVisited( state: AppState, threadId: ThreadId, @@ -1221,10 +1896,7 @@ export function setThreadWorkspace( interface AppStore extends AppState { syncServerReadModel: (readModel: OrchestrationReadModel) => void; - applyThreadTurnDiffCompleted: ( - threadId: ThreadId, - summary: Thread["turnDiffSummaries"][number], - ) => void; + applyOrchestrationEvents: (events: ReadonlyArray) => void; markThreadVisited: (threadId: ThreadId, visitedAt?: string) => void; markThreadUnread: (threadId: ThreadId) => void; toggleProject: (projectId: Project["id"]) => void; @@ -1240,8 +1912,7 @@ interface AppStore extends AppState { export const useStore = create((set) => ({ ...readPersistedState(), syncServerReadModel: (readModel) => set((state) => syncServerReadModel(state, readModel)), - applyThreadTurnDiffCompleted: (threadId, summary) => - set((state) => applyThreadTurnDiffCompleted(state, { threadId, summary })), + applyOrchestrationEvents: (events) => set((state) => applyOrchestrationEvents(state, events)), markThreadVisited: (threadId, visitedAt) => set((state) => markThreadVisited(state, threadId, visitedAt)), markThreadUnread: (threadId) => set((state) => markThreadUnread(state, threadId)), diff --git a/apps/web/src/types.ts b/apps/web/src/types.ts index b1330b0d..5d97dd3f 100644 --- a/apps/web/src/types.ts +++ b/apps/web/src/types.ts @@ -158,6 +158,11 @@ export interface Thread extends ThreadWorkspaceState { createdAt: string; updatedAt?: string | undefined; latestTurn: OrchestrationLatestTurn | null; + pendingSourceProposedPlan?: OrchestrationLatestTurn["sourceProposedPlan"]; + latestUserMessageAt: string | null; + hasPendingApprovals: boolean; + hasPendingUserInput: boolean; + hasActionableProposedPlan: boolean; lastVisitedAt?: string | undefined; parentThreadId?: ThreadId | null; subagentAgentId?: string | null; diff --git a/apps/web/src/worktreeCleanup.test.ts b/apps/web/src/worktreeCleanup.test.ts index 84121e87..0a3f6d5e 100644 --- a/apps/web/src/worktreeCleanup.test.ts +++ b/apps/web/src/worktreeCleanup.test.ts @@ -24,6 +24,10 @@ function makeThread(overrides: Partial = {}): Thread { error: null, createdAt: "2026-02-13T00:00:00.000Z", latestTurn: null, + latestUserMessageAt: null, + hasPendingApprovals: false, + hasPendingUserInput: false, + hasActionableProposedPlan: false, branch: null, worktreePath: null, ...overrides, diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 642bcb30..ee5c69a4 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -363,6 +363,18 @@ export const OrchestrationThread = Schema.Struct({ latestTurn: Schema.NullOr(OrchestrationLatestTurn), createdAt: IsoDateTime, updatedAt: IsoDateTime, + latestUserMessageAt: Schema.optional(Schema.NullOr(IsoDateTime)).pipe( + Schema.withDecodingDefault(() => null), + ), + hasPendingApprovals: Schema.optional(Schema.Boolean).pipe( + Schema.withDecodingDefault(() => false), + ), + hasPendingUserInput: Schema.optional(Schema.Boolean).pipe( + Schema.withDecodingDefault(() => false), + ), + hasActionableProposedPlan: Schema.optional(Schema.Boolean).pipe( + Schema.withDecodingDefault(() => false), + ), archivedAt: Schema.optional(Schema.NullOr(IsoDateTime)).pipe( Schema.withDecodingDefault(() => null), ), diff --git a/packages/shared/package.json b/packages/shared/package.json index 045e0f12..be9568cd 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -52,6 +52,10 @@ "types": "./src/threadWorkspace.ts", "import": "./src/threadWorkspace.ts" }, + "./threadSignals": { + "types": "./src/threadSignals.ts", + "import": "./src/threadSignals.ts" + }, "./subagents": { "types": "./src/subagents.ts", "import": "./src/subagents.ts" diff --git a/packages/shared/src/threadSignals.test.ts b/packages/shared/src/threadSignals.test.ts new file mode 100644 index 00000000..29d957f2 --- /dev/null +++ b/packages/shared/src/threadSignals.test.ts @@ -0,0 +1,179 @@ +// FILE: threadSignals.test.ts +// Purpose: Covers shared thread signal helpers used by both snapshot projection and live web state. +// Exports: Vitest coverage for pending-request badges, latest user timestamps, and proposed-plan actionability. + +import { describe, expect, it } from "vitest"; +import { + EventId, + OrchestrationProposedPlanId, + TurnId, + type OrchestrationThreadActivity, +} from "@t3tools/contracts"; + +import { + compareThreadActivitiesByOrder, + derivePendingApprovalSignals, + deriveLatestUserMessageAt, + hasActionableProposedPlanSignal, + hasPendingApprovalsSignal, + hasPendingUserInputSignal, +} from "./threadSignals"; + +function makeActivity( + overrides: Partial & { kind: OrchestrationThreadActivity["kind"] }, +): OrchestrationThreadActivity { + return { + id: EventId.makeUnsafe(overrides.id ?? crypto.randomUUID()), + createdAt: overrides.createdAt ?? "2026-02-27T00:00:00.000Z", + kind: overrides.kind, + summary: overrides.summary ?? overrides.kind, + tone: overrides.tone ?? "info", + payload: overrides.payload ?? {}, + turnId: overrides.turnId ?? null, + ...(overrides.sequence !== undefined ? { sequence: overrides.sequence } : {}), + }; +} + +describe("threadSignals", () => { + it("derives the latest user message timestamp", () => { + expect( + deriveLatestUserMessageAt([ + { role: "assistant", createdAt: "2026-02-27T00:00:01.000Z" }, + { role: "user", createdAt: "2026-02-27T00:00:02.000Z" }, + { role: "user", createdAt: "2026-02-27T00:00:03.000Z" }, + ]), + ).toBe("2026-02-27T00:00:03.000Z"); + }); + + it("ignores malformed approval requests when computing pending approval badges", () => { + expect( + hasPendingApprovalsSignal([ + makeActivity({ + kind: "approval.requested", + payload: { + requestId: "req-invalid", + }, + }), + ]), + ).toBe(false); + + expect( + hasPendingApprovalsSignal([ + makeActivity({ + kind: "approval.requested", + payload: { + requestId: "req-valid", + requestType: "exec_command_approval", + }, + }), + ]), + ).toBe(true); + }); + + it("keeps pending approval ordering aligned with activity sequence", () => { + const approvals = derivePendingApprovalSignals([ + makeActivity({ + id: EventId.makeUnsafe("approval-resolved"), + kind: "approval.resolved", + sequence: 2, + payload: { requestId: "req-sequenced" }, + }), + makeActivity({ + id: EventId.makeUnsafe("approval-open"), + kind: "approval.requested", + sequence: 1, + payload: { + requestId: "req-sequenced", + requestType: "exec_command_approval", + }, + }), + ]); + + expect(approvals).toHaveLength(0); + expect( + [2, 1] + .map((sequence) => + makeActivity({ + id: EventId.makeUnsafe(`approval-order-${sequence}`), + kind: "approval.requested", + sequence, + payload: { + requestId: `req-order-${sequence}`, + requestType: "exec_command_approval", + }, + }), + ) + .toSorted(compareThreadActivitiesByOrder) + .map((activity) => activity.sequence), + ).toEqual([1, 2]); + }); + + it("only marks pending user input when the questions payload is renderable", () => { + expect( + hasPendingUserInputSignal([ + makeActivity({ + kind: "user-input.requested", + payload: { + requestId: "user-input-invalid", + questions: [{ id: "q1" }], + }, + }), + ]), + ).toBe(false); + + expect( + hasPendingUserInputSignal([ + makeActivity({ + kind: "user-input.requested", + payload: { + requestId: "user-input-valid", + questions: [ + { + id: "q1", + header: "Need input", + question: "Choose one", + options: [{ label: "A", description: "First option" }], + }, + ], + }, + }), + ]), + ).toBe(true); + }); + + it("prefers the latest plan for the active turn when deciding actionability", () => { + expect( + hasActionableProposedPlanSignal( + [ + { + id: OrchestrationProposedPlanId.makeUnsafe("plan-latest-other"), + turnId: TurnId.makeUnsafe("turn-other"), + updatedAt: "2026-02-27T00:00:04.000Z", + implementedAt: "2026-02-27T00:00:05.000Z", + }, + { + id: OrchestrationProposedPlanId.makeUnsafe("plan-active"), + turnId: TurnId.makeUnsafe("turn-active"), + updatedAt: "2026-02-27T00:00:03.000Z", + implementedAt: null, + }, + ], + { turnId: TurnId.makeUnsafe("turn-active") }, + ), + ).toBe(true); + + expect( + hasActionableProposedPlanSignal( + [ + { + id: OrchestrationProposedPlanId.makeUnsafe("plan-done"), + turnId: TurnId.makeUnsafe("turn-active"), + updatedAt: "2026-02-27T00:00:03.000Z", + implementedAt: "2026-02-27T00:00:06.000Z", + }, + ], + { turnId: TurnId.makeUnsafe("turn-active") }, + ), + ).toBe(false); + }); +}); diff --git a/packages/shared/src/threadSignals.ts b/packages/shared/src/threadSignals.ts new file mode 100644 index 00000000..920bead3 --- /dev/null +++ b/packages/shared/src/threadSignals.ts @@ -0,0 +1,321 @@ +// FILE: threadSignals.ts +// Purpose: Shared helpers for lightweight thread/sidebar signals derived from messages, plans, and activities. +// Exports: Pure signal helpers used by server snapshot projection and web live state reconciliation. + +import type { + OrchestrationLatestTurn, + OrchestrationMessage, + OrchestrationProposedPlan, + OrchestrationThreadActivity, + UserInputQuestion, +} from "@t3tools/contracts"; +import { ApprovalRequestId } from "@t3tools/contracts"; + +type MessageLike = Pick; +type ProposedPlanLike = Pick< + OrchestrationProposedPlan, + "id" | "turnId" | "updatedAt" | "implementedAt" +>; +type ApprovalRequestKind = "command" | "file-read" | "file-change"; + +export interface PendingApprovalSignal { + requestId: ApprovalRequestId; + requestKind: ApprovalRequestKind; + createdAt: string; + detail?: string; +} + +export interface PendingUserInputSignal { + requestId: ApprovalRequestId; + createdAt: string; + questions: ReadonlyArray; +} + +function asRecord(value: unknown): Record | null { + return value && typeof value === "object" ? (value as Record) : null; +} + +function isStalePendingRequestFailureDetail(detail: string | undefined): boolean { + const normalized = detail?.toLowerCase(); + if (!normalized) { + return false; + } + return ( + normalized.includes("stale pending approval request") || + normalized.includes("stale pending user-input request") || + normalized.includes("unknown pending approval request") || + normalized.includes("unknown pending permission request") || + normalized.includes("unknown pending user-input request") + ); +} + +function compareActivityLifecycleRank(kind: string): number { + if (kind.endsWith(".started") || kind === "tool.started") { + return 0; + } + if (kind.endsWith(".progress") || kind.endsWith(".updated")) { + return 1; + } + if (kind.endsWith(".completed") || kind.endsWith(".resolved")) { + return 2; + } + return 1; +} + +export function compareThreadActivitiesByOrder( + left: OrchestrationThreadActivity, + right: OrchestrationThreadActivity, +): number { + if (left.sequence !== undefined && right.sequence !== undefined) { + if (left.sequence !== right.sequence) { + return left.sequence - right.sequence; + } + } else if (left.sequence !== undefined) { + return 1; + } else if (right.sequence !== undefined) { + return -1; + } + + const createdAtComparison = left.createdAt.localeCompare(right.createdAt); + if (createdAtComparison !== 0) { + return createdAtComparison; + } + + const lifecycleRankComparison = + compareActivityLifecycleRank(left.kind) - compareActivityLifecycleRank(right.kind); + if (lifecycleRankComparison !== 0) { + return lifecycleRankComparison; + } + + return left.id.localeCompare(right.id); +} + +function requestKindFromRequestType(requestType: unknown): ApprovalRequestKind | null { + switch (requestType) { + case "command_execution_approval": + case "exec_command_approval": + return "command"; + case "file_read_approval": + return "file-read"; + case "file_change_approval": + case "apply_patch_approval": + return "file-change"; + default: + return null; + } +} + +export function requestKindFromActivityPayload( + payload: Record | null, +): ApprovalRequestKind | null { + if (!payload) { + return null; + } + if ( + payload.requestKind === "command" || + payload.requestKind === "file-read" || + payload.requestKind === "file-change" + ) { + return payload.requestKind; + } + return requestKindFromRequestType(payload.requestType); +} + +function parseUserInputQuestions( + payload: Record | null, +): ReadonlyArray | null { + const questions = payload?.questions; + if (!Array.isArray(questions)) { + return null; + } + const parsed = questions + .map((entry) => { + if (!entry || typeof entry !== "object") { + return null; + } + const question = entry as Record; + if ( + typeof question.id !== "string" || + typeof question.header !== "string" || + typeof question.question !== "string" || + !Array.isArray(question.options) + ) { + return null; + } + const options = question.options + .map((option) => { + if (!option || typeof option !== "object") { + return null; + } + const optionRecord = option as Record; + if ( + typeof optionRecord.label !== "string" || + typeof optionRecord.description !== "string" + ) { + return null; + } + return { + label: optionRecord.label, + description: optionRecord.description, + }; + }) + .filter((option): option is UserInputQuestion["options"][number] => option !== null); + if (options.length === 0) { + return null; + } + return { + id: question.id, + header: question.header, + question: question.question, + options, + }; + }) + .filter((question): question is UserInputQuestion => question !== null); + return parsed.length > 0 ? parsed : null; +} + +export function derivePendingApprovalSignals( + activities: ReadonlyArray, +): ReadonlyArray { + const openByRequestId = new Map(); + + for (const activity of [...activities].toSorted(compareThreadActivitiesByOrder)) { + const payload = asRecord(activity.payload); + const requestId = + typeof payload?.requestId === "string" + ? ApprovalRequestId.makeUnsafe(payload.requestId) + : null; + const detail = typeof payload?.detail === "string" ? payload.detail : undefined; + const requestKind = requestKindFromActivityPayload(payload); + + if (activity.kind === "approval.requested" && requestId && requestKind) { + openByRequestId.set(requestId, { + requestId, + requestKind, + createdAt: activity.createdAt, + ...(detail ? { detail } : {}), + }); + continue; + } + if (activity.kind === "approval.resolved" && requestId) { + openByRequestId.delete(requestId); + continue; + } + if ( + activity.kind === "provider.approval.respond.failed" && + requestId && + isStalePendingRequestFailureDetail(detail) + ) { + openByRequestId.delete(requestId); + } + } + + return [...openByRequestId.values()].toSorted((left, right) => + left.createdAt.localeCompare(right.createdAt), + ); +} + +// Keep reconnect/startup summaries aligned with live UI by applying the same request-shape guards. +export function hasPendingApprovalsSignal( + activities: ReadonlyArray, +): boolean { + return derivePendingApprovalSignals(activities).length > 0; +} + +export function derivePendingUserInputSignals( + activities: ReadonlyArray, +): ReadonlyArray { + const openByRequestId = new Map(); + + for (const activity of [...activities].toSorted(compareThreadActivitiesByOrder)) { + const payload = asRecord(activity.payload); + const requestId = + typeof payload?.requestId === "string" + ? ApprovalRequestId.makeUnsafe(payload.requestId) + : null; + const detail = typeof payload?.detail === "string" ? payload.detail : undefined; + + if (activity.kind === "user-input.requested" && requestId) { + const questions = parseUserInputQuestions(payload); + if (!questions) { + continue; + } + openByRequestId.set(requestId, { + requestId, + createdAt: activity.createdAt, + questions, + }); + continue; + } + if (activity.kind === "user-input.resolved" && requestId) { + openByRequestId.delete(requestId); + continue; + } + if ( + activity.kind === "provider.user-input.respond.failed" && + requestId && + isStalePendingRequestFailureDetail(detail) + ) { + openByRequestId.delete(requestId); + } + } + + return [...openByRequestId.values()].toSorted((left, right) => + left.createdAt.localeCompare(right.createdAt), + ); +} + +// User-input badges should only light up when the request still has a renderable question set. +export function hasPendingUserInputSignal( + activities: ReadonlyArray, +): boolean { + return derivePendingUserInputSignals(activities).length > 0; +} + +export function deriveLatestUserMessageAt(messages: ReadonlyArray): string | null { + let latestUserMessageAt: string | null = null; + for (const message of messages) { + if (message.role !== "user") { + continue; + } + if (latestUserMessageAt === null || message.createdAt > latestUserMessageAt) { + latestUserMessageAt = message.createdAt; + } + } + return latestUserMessageAt; +} + +export function findLatestProposedPlanSignal( + proposedPlans: ReadonlyArray, + latestTurnId: OrchestrationLatestTurn["turnId"] | string | null | undefined, +): TPlan | null { + if (latestTurnId) { + const matchingTurnPlan = [...proposedPlans] + .filter((plan) => plan.turnId === latestTurnId) + .toSorted( + (left, right) => + left.updatedAt.localeCompare(right.updatedAt) || left.id.localeCompare(right.id), + ) + .at(-1); + if (matchingTurnPlan) { + return matchingTurnPlan; + } + } + + return ( + [...proposedPlans] + .toSorted( + (left, right) => + left.updatedAt.localeCompare(right.updatedAt) || left.id.localeCompare(right.id), + ) + .at(-1) ?? null + ); +} + +export function hasActionableProposedPlanSignal( + proposedPlans: ReadonlyArray, + latestTurn: Pick | null, +): boolean { + const latestPlan = findLatestProposedPlanSignal(proposedPlans, latestTurn?.turnId ?? null); + return latestPlan !== null && latestPlan.implementedAt === null; +}