Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ import {
type ProjectionThreadCheckpointContext,
type ProjectionSnapshotQueryShape,
} from "../Services/ProjectionSnapshotQuery.ts";
import {
deriveLatestUserMessageAt,
hasActionableProposedPlanSignal,
hasPendingApprovalsSignal,
hasPendingUserInputSignal,
} from "@t3tools/shared/threadSignals";

const decodeReadModel = Schema.decodeUnknownEffect(OrchestrationReadModel);
const ProjectionProjectDbRowSchema = ProjectionProject.mapFields(
Expand Down Expand Up @@ -251,6 +257,7 @@ function toProjectedThread(input: {
readonly session: OrchestrationSession | null;
}): OrchestrationThread {
const { threadRow } = input;
const latestUserMessageAt = deriveLatestUserMessageAt(input.messages);
return {
id: threadRow.threadId,
projectId: threadRow.projectId,
Expand All @@ -272,6 +279,13 @@ function toProjectedThread(input: {
latestTurn: input.latestTurn,
createdAt: threadRow.createdAt,
updatedAt: threadRow.updatedAt,
latestUserMessageAt,
hasPendingApprovals: hasPendingApprovalsSignal(input.activities),
hasPendingUserInput: hasPendingUserInputSignal(input.activities),
hasActionableProposedPlan: hasActionableProposedPlanSignal(
input.proposedPlans,
input.latestTurn,
),
archivedAt: threadRow.archivedAt ?? null,
deletedAt: threadRow.deletedAt,
handoff: threadRow.handoff,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,53 @@ describe("ProviderCommandReactor", () => {
expect(thread?.session?.runtimeMode).toBe("approval-required");
});

it("starts a fresh provider session when the projected session is stale after restart", async () => {
const harness = await createHarness();
const now = new Date().toISOString();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.session.set",
commandId: CommandId.makeUnsafe("cmd-session-stale-projected"),
threadId: ThreadId.makeUnsafe("thread-1"),
session: {
threadId: ThreadId.makeUnsafe("thread-1"),
status: "ready",
providerName: "codex",
runtimeMode: "approval-required",
activeTurnId: null,
lastError: null,
updatedAt: now,
},
createdAt: now,
}),
);

harness.startSession.mockClear();
harness.sendTurn.mockClear();

await Effect.runPromise(
harness.engine.dispatch({
type: "thread.turn.start",
commandId: CommandId.makeUnsafe("cmd-turn-start-after-stale-session"),
threadId: ThreadId.makeUnsafe("thread-1"),
message: {
messageId: asMessageId("user-message-stale-session"),
role: "user",
text: "resume after backend restart",
attachments: [],
},
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
runtimeMode: "approval-required",
createdAt: now,
}),
);

await waitFor(() => harness.startSession.mock.calls.length === 1);
await waitFor(() => harness.sendTurn.mock.calls.length === 1);
expect(harness.startSession.mock.calls[0]?.[0]).toEqual(ThreadId.makeUnsafe("thread-1"));
});

it("renames a generic first-turn thread title using text generation", async () => {
const harness = await createHarness();
const now = new Date().toISOString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,14 +388,15 @@ const make = Effect.gen(function* () {
createdAt,
});

const activeSession = thread.session ? yield* resolveActiveSession(thread.id) : undefined;
// Only trust the projected session if the provider runtime still has a live session.
const existingSessionThreadId =
thread.session && thread.session.status !== "stopped" ? thread.id : null;
thread.session && thread.session.status !== "stopped" && activeSession ? thread.id : null;
if (existingSessionThreadId) {
const runtimeModeChanged = thread.runtimeMode !== thread.session?.runtimeMode;
const providerChanged =
requestedModelSelection !== undefined &&
requestedModelSelection.provider !== currentProvider;
const activeSession = yield* resolveActiveSession(existingSessionThreadId);
const sessionModelSwitch =
currentProvider === undefined
? "in-session"
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/components/ChatView.logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ export function buildLocalDraftThread(
error,
createdAt: draftThread.createdAt,
latestTurn: null,
latestUserMessageAt: null,
hasPendingApprovals: false,
hasPendingUserInput: false,
hasActionableProposedPlan: false,
lastVisitedAt: draftThread.createdAt,
branch: draftThread.branch,
worktreePath: draftThread.worktreePath,
Expand Down
8 changes: 8 additions & 0 deletions apps/web/src/components/Sidebar.logic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ describe("pin helpers", () => {
error: null,
createdAt: "2026-03-09T10:00:00.000Z",
latestTurn: null,
latestUserMessageAt: null,
hasPendingApprovals: false,
hasPendingUserInput: false,
hasActionableProposedPlan: false,
turnDiffSummaries: [],
activities: [],
branch: null,
Expand Down Expand Up @@ -784,6 +788,10 @@ function makeThread(overrides: Partial<Thread> = {}): Thread {
createdAt: "2026-03-09T10:00:00.000Z",
updatedAt: "2026-03-09T10:00:00.000Z",
latestTurn: null,
latestUserMessageAt: null,
hasPendingApprovals: false,
hasPendingUserInput: false,
hasActionableProposedPlan: false,
branch: null,
worktreePath: null,
turnDiffSummaries: [],
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/focusedChatContext.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ function makeThread(threadId: ThreadId, overrides: Partial<Thread> = {}): Thread
assistantMessageId: null,
sourceProposedPlan: undefined,
},
latestUserMessageAt: null,
hasPendingApprovals: false,
hasPendingUserInput: false,
hasActionableProposedPlan: false,
lastVisitedAt: "2026-04-07T10:01:00.000Z",
branch: null,
worktreePath: null,
Expand Down
4 changes: 4 additions & 0 deletions apps/web/src/notifications/taskCompletion.logic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ function makeThread(overrides: Partial<Thread>): Thread {
completedAt: null,
assistantMessageId: null,
},
latestUserMessageAt: null,
hasPendingApprovals: false,
hasPendingUserInput: false,
hasActionableProposedPlan: false,
lastVisitedAt: "2026-04-05T10:00:00.000Z",
branch: null,
worktreePath: null,
Expand Down
63 changes: 46 additions & 17 deletions apps/web/src/routes/__root.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ThreadId } from "@t3tools/contracts";
import { ThreadId, type OrchestrationEvent } from "@t3tools/contracts";
import { defaultTerminalTitleForCliKind } from "@t3tools/shared/terminalThreads";
import {
Outlet,
Expand Down Expand Up @@ -144,8 +144,46 @@ function wait(ms: number): Promise<void> {
return new Promise((resolve) => window.setTimeout(resolve, ms));
}

function coalesceOrchestrationUiEvents(
events: ReadonlyArray<OrchestrationEvent>,
): OrchestrationEvent[] {
if (events.length < 2) {
return [...events];
}

const coalesced: OrchestrationEvent[] = [];
for (const event of events) {
const previous = coalesced.at(-1);
if (
previous?.type === "thread.message-sent" &&
event.type === "thread.message-sent" &&
previous.payload.threadId === event.payload.threadId &&
previous.payload.messageId === event.payload.messageId
) {
coalesced[coalesced.length - 1] = {
...event,
payload: {
...event.payload,
attachments: event.payload.attachments ?? previous.payload.attachments,
createdAt: previous.payload.createdAt,
text:
!event.payload.streaming && event.payload.text.length > 0
? event.payload.text
: previous.payload.text + event.payload.text,
},
};
continue;
}

coalesced.push(event);
}

return coalesced;
}

function EventRouter() {
const syncServerReadModel = useStore((store) => store.syncServerReadModel);
const applyOrchestrationEvents = useStore((store) => store.applyOrchestrationEvents);
const setProjectExpanded = useStore((store) => store.setProjectExpanded);
const removeOrphanedTerminalStates = useTerminalStateStore(
(store) => store.removeOrphanedTerminalStates,
Expand All @@ -168,6 +206,7 @@ function EventRouter() {
let syncing = false;
let pending = false;
let needsProviderInvalidation = false;
let pendingDomainEvents: OrchestrationEvent[] = [];

const removeOrphanedTerminalsForCurrentState = () => {
const draftThreadIds = Object.keys(
Expand Down Expand Up @@ -234,6 +273,10 @@ function EventRouter() {

const domainEventFlushThrottler = new Throttler(
() => {
if (pendingDomainEvents.length > 0) {
applyOrchestrationEvents(coalesceOrchestrationUiEvents(pendingDomainEvents));
pendingDomainEvents = [];
}
if (needsProviderInvalidation) {
needsProviderInvalidation = false;
void queryClient.invalidateQueries({ queryKey: providerQueryKeys.all });
Expand All @@ -255,25 +298,10 @@ function EventRouter() {
return;
}
latestSequence = event.sequence;
pendingDomainEvents.push(event);
if (event.type === "thread.turn-diff-completed" || event.type === "thread.reverted") {
needsProviderInvalidation = true;
}
if (event.type === "thread.turn-diff-completed") {
useStore.getState().applyThreadTurnDiffCompleted(event.payload.threadId, {
turnId: event.payload.turnId,
completedAt: event.payload.completedAt,
status: event.payload.status,
files: event.payload.files.map((file) => ({
path: file.path,
...(file.kind !== undefined ? { kind: file.kind } : {}),
...(file.additions !== undefined ? { additions: file.additions } : {}),
...(file.deletions !== undefined ? { deletions: file.deletions } : {}),
})),
checkpointRef: event.payload.checkpointRef,
assistantMessageId: event.payload.assistantMessageId ?? undefined,
checkpointTurnCount: event.payload.checkpointTurnCount,
});
}
domainEventFlushThrottler.maybeExecute();
});
const unsubTerminalEvent = api.terminal.onEvent((event) => {
Expand Down Expand Up @@ -378,6 +406,7 @@ function EventRouter() {
unsubServerConfigUpdated();
};
}, [
applyOrchestrationEvents,
navigate,
queryClient,
removeOrphanedTerminalStates,
Expand Down
Loading