diff --git a/.oxlintrc.json b/.oxlintrc.json index de3a72ae11..b242843040 100644 --- a/.oxlintrc.json +++ b/.oxlintrc.json @@ -9,7 +9,7 @@ "**/routeTree.gen.ts" ], "plugins": ["eslint", "oxc", "react", "unicorn", "typescript"], - "jsPlugins": ["./oxlint-plugin-t3code/index.ts"], + "jsPlugins": ["./oxlint-plugin-t3code/index.js"], "categories": { "correctness": "warn", "suspicious": "warn", diff --git a/README.md b/README.md index c439743cea..cf6f31b70b 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,16 @@ brew install --cask t3-code yay -S t3code-bin ``` +## Fork Releases + +If you want desktop releases and auto-updates from a fork or branch such as `main-xavier`, use the `Release Desktop` GitHub Actions workflow with: + +- `target_branch`: the branch you want to release from, for example `main-xavier` +- `publish_cli`: `false` for fork/private desktop-only releases +- `finalize_version_bump`: `false` unless you want the workflow to push the version bump commit back to that branch + +The packaged desktop app will target the current GitHub repository for updater assets during CI builds, so fork releases will update from that fork's GitHub Releases. + ## Some notes We are very very early in this project. Expect bugs. diff --git a/apps/desktop/src/settings/DesktopClientSettings.test.ts b/apps/desktop/src/settings/DesktopClientSettings.test.ts index f666e69286..fec5b34923 100644 --- a/apps/desktop/src/settings/DesktopClientSettings.test.ts +++ b/apps/desktop/src/settings/DesktopClientSettings.test.ts @@ -13,6 +13,7 @@ import * as DesktopClientSettings from "./DesktopClientSettings.ts"; const clientSettings: ClientSettings = { autoOpenPlanSidebar: false, + assistantResponseCopyFormat: "markdown", confirmThreadArchive: true, confirmThreadDelete: false, dismissedProviderUpdateNotificationKeys: [], diff --git a/apps/server/integration/OrchestrationEngineHarness.integration.ts b/apps/server/integration/OrchestrationEngineHarness.integration.ts index 837c32fc4f..f8146c0587 100644 --- a/apps/server/integration/OrchestrationEngineHarness.integration.ts +++ b/apps/server/integration/OrchestrationEngineHarness.integration.ts @@ -216,12 +216,15 @@ export interface OrchestrationIntegrationHarness { timeoutMs?: number, ): Effect.Effect; }; + readonly startReactor: Effect.Effect; readonly dispose: Effect.Effect; } interface MakeOrchestrationIntegrationHarnessOptions { readonly provider?: ProviderDriverKind; readonly realCodex?: boolean; + readonly rootDir?: string; + readonly autoStartReactor?: boolean; } export const makeOrchestrationIntegrationHarness = ( @@ -244,9 +247,11 @@ export const makeOrchestrationIntegrationHarness = ( makeAdapterRegistryMock({ [adapterHarness.provider]: adapterHarness.adapter }), ) : null; - const rootDir = yield* fileSystem.makeTempDirectoryScoped({ - prefix: "t3-orchestration-integration-", - }); + const rootDir = + options?.rootDir ?? + (yield* fileSystem.makeTempDirectoryScoped({ + prefix: "t3-orchestration-integration-", + })); const workspaceDir = path.join(rootDir, "workspace"); const { stateDir, dbPath } = yield* deriveServerPaths(rootDir, undefined).pipe( Effect.provideService(Path.Path, path), @@ -404,9 +409,12 @@ export const makeOrchestrationIntegrationHarness = ( ).pipe(Effect.orDie); const scope = yield* Scope.make("sequential"); - yield* tryRuntimePromise("start OrchestrationReactor", () => + const startReactor = tryRuntimePromise("start OrchestrationReactor", () => runtime.runPromise(reactor.start().pipe(Scope.provide(scope))), ).pipe(Effect.orDie); + if (options?.autoStartReactor !== false) { + yield* startReactor; + } const receiptHistory = yield* Ref.make>([]); yield* Stream.runForEach(runtimeReceiptBus.streamEventsForTest, (receipt) => Ref.update(receiptHistory, (history) => [...history, receipt]).pipe(Effect.asVoid), @@ -546,6 +554,7 @@ export const makeOrchestrationIntegrationHarness = ( waitForDomainEvent, waitForPendingApproval, waitForReceipt, + startReactor, dispose, } satisfies OrchestrationIntegrationHarness; }); diff --git a/apps/server/integration/orchestrationEngine.integration.test.ts b/apps/server/integration/orchestrationEngine.integration.test.ts index e79897c740..61f5fdc122 100644 --- a/apps/server/integration/orchestrationEngine.integration.test.ts +++ b/apps/server/integration/orchestrationEngine.integration.test.ts @@ -1,5 +1,6 @@ // @effect-diagnostics nodeBuiltinImport:off import fs from "node:fs"; +import os from "node:os"; import path from "node:path"; import { @@ -106,6 +107,17 @@ function withHarness( ).pipe(Effect.provide(NodeServices.layer)); } +function withHarnessOptions( + options: Parameters[0], + use: (harness: OrchestrationIntegrationHarness) => Effect.Effect, +) { + return Effect.acquireUseRelease( + makeOrchestrationIntegrationHarness(options), + use, + (harness) => harness.dispose, + ).pipe(Effect.provide(NodeServices.layer)); +} + function withRealCodexHarness( use: (harness: OrchestrationIntegrationHarness) => Effect.Effect, ) { @@ -266,6 +278,90 @@ it.live("runs a single turn end-to-end and persists checkpoint state in sqlite + ), ); +it.live("replays queued follow-ups after orchestration restarts", () => + Effect.gen(function* () { + const rootDir = fs.mkdtempSync(path.join(os.tmpdir(), "t3-orchestration-queue-restart-")); + + yield* withHarnessOptions({ rootDir }, (harness) => + Effect.gen(function* () { + yield* seedProjectAndThread(harness); + + yield* harness.engine.dispatch({ + type: "thread.queued-follow-up.enqueue", + commandId: CommandId.makeUnsafe("cmd-queued-follow-up-restart-enqueue"), + threadId: THREAD_ID, + followUp: { + id: "follow-up-restart-1", + createdAt: nowIso(), + prompt: "Resume this after restart", + attachments: [], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: DEFAULT_MODEL_BY_PROVIDER[ProviderDriverKind.make("codex")] ?? DEFAULT_MODEL, + }, + runtimeMode: "approval-required", + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + lastSendError: null, + }, + createdAt: nowIso(), + }); + + const queuedThread = yield* harness.waitForThread( + THREAD_ID, + (thread) => thread.queuedFollowUps.length === 1, + ); + assert.equal(queuedThread.queuedFollowUps[0]?.prompt, "Resume this after restart"); + }), + ); + + yield* withHarnessOptions({ rootDir, autoStartReactor: false }, (harness) => + Effect.gen(function* () { + yield* harness.adapterHarness!.queueTurnResponseForNextSession({ + events: [ + { + type: "turn.started", + ...runtimeBase("evt-queued-restart-1", "2026-03-28T12:05:00.000Z"), + threadId: THREAD_ID, + turnId: FIXTURE_TURN_ID, + }, + { + type: "message.delta", + ...runtimeBase("evt-queued-restart-2", "2026-03-28T12:05:00.050Z"), + threadId: THREAD_ID, + turnId: FIXTURE_TURN_ID, + delta: "Recovered queued follow-up output.\n", + }, + { + type: "turn.completed", + ...runtimeBase("evt-queued-restart-3", "2026-03-28T12:05:00.100Z"), + threadId: THREAD_ID, + turnId: FIXTURE_TURN_ID, + status: "completed", + }, + ], + }); + + yield* harness.startReactor; + + const recoveredThread = yield* harness.waitForThread( + THREAD_ID, + (thread) => + thread.queuedFollowUps.length === 0 && + thread.messages.some( + (message) => + message.role === "assistant" && + message.text.includes("Recovered queued follow-up output."), + ), + ); + + assert.equal(recoveredThread.queuedFollowUps.length, 0); + }), + ); + }).pipe(Effect.provide(NodeServices.layer)), +); + it.live.skipIf(!process.env.CODEX_BINARY_PATH)( "keeps the same Codex provider thread across runtime mode switches", () => diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts index 7909d5cd6b..baeebe6d47 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.test.ts @@ -148,6 +148,7 @@ describe("OrchestrationEngine", () => { archivedAt: null, deletedAt: null, messages: [], + queuedFollowUps: [], proposedPlans: [], activities: [], checkpoints: [], diff --git a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts index cf8407bd21..94469d5780 100644 --- a/apps/server/src/orchestration/Layers/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Layers/OrchestrationEngine.ts @@ -307,9 +307,13 @@ const makeOrchestrationEngine = Effect.gen(function* () { return yield* Deferred.await(result); }); + const getReadModel: OrchestrationEngineShape["getReadModel"] = () => + Effect.sync(() => commandReadModel); + return { readEvents, dispatch, + getReadModel, // Each access creates a fresh PubSub subscription so that multiple // consumers (wsServer, ProviderRuntimeIngestion, CheckpointReactor, etc.) // each independently receive all domain events. diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 369eea0f7a..0943fa87b1 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -931,6 +931,496 @@ it.layer( it.layer(Layer.fresh(makeProjectionPipelinePrefixedTestLayer("t3-projection-attachments-revert-")))( "OrchestrationProjectionPipeline", (it) => { + it.effect( + "clears queued follow-ups and prunes queued attachment files when a thread is reverted", + () => + Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const { attachmentsDir } = yield* ServerConfig; + const now = new Date().toISOString(); + const threadId = ThreadId.makeUnsafe("thread-queued-revert"); + const queuedAttachmentId = "thread-queued-revert-00000000-0000-4000-8000-000000000001"; + const otherThreadAttachmentId = + "thread-queued-revert-extra-00000000-0000-4000-8000-000000000002"; + + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.makeUnsafe("evt-queued-revert-1"), + aggregateKind: "project", + aggregateId: ProjectId.makeUnsafe("project-queued-revert"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-revert-1"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-revert-1"), + metadata: {}, + payload: { + projectId: ProjectId.makeUnsafe("project-queued-revert"), + title: "Project Queued Revert", + workspaceRoot: "/tmp/project-queued-revert", + defaultModelSelection: null, + scripts: [], + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-queued-revert-2"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-revert-2"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-revert-2"), + metadata: {}, + payload: { + threadId, + projectId: ProjectId.makeUnsafe("project-queued-revert"), + title: "Thread Queued Revert", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.queued-follow-up-enqueued", + eventId: EventId.makeUnsafe("evt-queued-revert-3"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-revert-3"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-revert-3"), + metadata: {}, + payload: { + threadId, + createdAt: now, + followUp: { + id: "queued-follow-up-1", + createdAt: now, + prompt: "queued prompt", + attachments: [ + { + type: "image", + id: queuedAttachmentId, + name: "queued.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + }, + }); + + const queuedAttachmentPath = path.join(attachmentsDir, `${queuedAttachmentId}.png`); + const otherThreadAttachmentPath = path.join( + attachmentsDir, + `${otherThreadAttachmentId}.png`, + ); + yield* fileSystem.makeDirectory(attachmentsDir, { recursive: true }); + yield* fileSystem.writeFileString(queuedAttachmentPath, "queued"); + yield* fileSystem.writeFileString(otherThreadAttachmentPath, "other-thread"); + assert.isTrue(yield* exists(queuedAttachmentPath)); + assert.isTrue(yield* exists(otherThreadAttachmentPath)); + + yield* appendAndProject({ + type: "thread.reverted", + eventId: EventId.makeUnsafe("evt-queued-revert-4"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-revert-4"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-revert-4"), + metadata: {}, + payload: { + threadId, + turnCount: 0, + }, + }); + + const queuedRows = yield* sql<{ readonly followUpId: string }>` + SELECT follow_up_id AS "followUpId" + FROM projection_thread_queued_follow_ups + WHERE thread_id = ${threadId} + `; + assert.deepEqual(queuedRows, []); + assert.isFalse(yield* exists(queuedAttachmentPath)); + assert.isTrue(yield* exists(otherThreadAttachmentPath)); + }), + ); + + it.effect("prunes removed queued follow-up attachment files on queue mutation", () => + Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const { attachmentsDir } = yield* ServerConfig; + const now = new Date().toISOString(); + const threadId = ThreadId.makeUnsafe("thread-queued-remove"); + const removedAttachmentId = "thread-queued-remove-00000000-0000-4000-8000-000000000001"; + const keptAttachmentId = "thread-queued-remove-00000000-0000-4000-8000-000000000002"; + + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.makeUnsafe("evt-queued-remove-1"), + aggregateKind: "project", + aggregateId: ProjectId.makeUnsafe("project-queued-remove"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-remove-1"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-remove-1"), + metadata: {}, + payload: { + projectId: ProjectId.makeUnsafe("project-queued-remove"), + title: "Project Queued Remove", + workspaceRoot: "/tmp/project-queued-remove", + defaultModelSelection: null, + scripts: [], + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-queued-remove-2"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-remove-2"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-remove-2"), + metadata: {}, + payload: { + threadId, + projectId: ProjectId.makeUnsafe("project-queued-remove"), + title: "Thread Queued Remove", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.queued-follow-up-enqueued", + eventId: EventId.makeUnsafe("evt-queued-remove-3"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-remove-3"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-remove-3"), + metadata: {}, + payload: { + threadId, + createdAt: now, + followUp: { + id: "queued-follow-up-remove", + createdAt: now, + prompt: "remove me", + attachments: [ + { + type: "image", + id: removedAttachmentId, + name: "remove.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + }, + }); + + yield* appendAndProject({ + type: "thread.queued-follow-up-enqueued", + eventId: EventId.makeUnsafe("evt-queued-remove-4"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-remove-4"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-remove-4"), + metadata: {}, + payload: { + threadId, + createdAt: now, + followUp: { + id: "queued-follow-up-keep", + createdAt: now, + prompt: "keep me", + attachments: [ + { + type: "image", + id: keptAttachmentId, + name: "keep.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + }, + }); + + const removedAttachmentPath = path.join(attachmentsDir, `${removedAttachmentId}.png`); + const keptAttachmentPath = path.join(attachmentsDir, `${keptAttachmentId}.png`); + yield* fileSystem.makeDirectory(attachmentsDir, { recursive: true }); + yield* fileSystem.writeFileString(removedAttachmentPath, "remove"); + yield* fileSystem.writeFileString(keptAttachmentPath, "keep"); + assert.isTrue(yield* exists(removedAttachmentPath)); + assert.isTrue(yield* exists(keptAttachmentPath)); + + yield* appendAndProject({ + type: "thread.queued-follow-up-removed", + eventId: EventId.makeUnsafe("evt-queued-remove-5"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-remove-5"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-remove-5"), + metadata: {}, + payload: { + threadId, + followUpId: "queued-follow-up-remove", + createdAt: now, + }, + }); + + assert.isFalse(yield* exists(removedAttachmentPath)); + assert.isTrue(yield* exists(keptAttachmentPath)); + }), + ); + + it.effect( + "prunes replaced queued follow-up attachments on enqueue upsert and bumps thread recency", + () => + Effect.gen(function* () { + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const { attachmentsDir } = yield* ServerConfig; + const now = "2026-03-29T02:00:00.000Z"; + const later = "2026-03-29T02:00:05.000Z"; + const threadId = ThreadId.makeUnsafe("thread-queued-upsert"); + const originalAttachmentId = "thread-queued-upsert-00000000-0000-4000-8000-000000000001"; + const replacementAttachmentId = + "thread-queued-upsert-00000000-0000-4000-8000-000000000002"; + + const appendAndProject = (event: Parameters[0]) => + eventStore + .append(event) + .pipe(Effect.flatMap((savedEvent) => projectionPipeline.projectEvent(savedEvent))); + + yield* appendAndProject({ + type: "project.created", + eventId: EventId.makeUnsafe("evt-queued-upsert-1"), + aggregateKind: "project", + aggregateId: ProjectId.makeUnsafe("project-queued-upsert"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-upsert-1"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-upsert-1"), + metadata: {}, + payload: { + projectId: ProjectId.makeUnsafe("project-queued-upsert"), + title: "Project Queued Upsert", + workspaceRoot: "/tmp/project-queued-upsert", + defaultModelSelection: null, + scripts: [], + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-queued-upsert-2"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-upsert-2"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-upsert-2"), + metadata: {}, + payload: { + threadId, + projectId: ProjectId.makeUnsafe("project-queued-upsert"), + title: "Thread Queued Upsert", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }); + + yield* appendAndProject({ + type: "thread.queued-follow-up-enqueued", + eventId: EventId.makeUnsafe("evt-queued-upsert-3"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-queued-upsert-3"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-upsert-3"), + metadata: {}, + payload: { + threadId, + createdAt: now, + followUp: { + id: "queued-follow-up-upsert", + createdAt: now, + prompt: "original queued prompt", + attachments: [ + { + type: "image", + id: originalAttachmentId, + name: "original.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + }, + }); + + const originalAttachmentPath = path.join(attachmentsDir, `${originalAttachmentId}.png`); + const replacementAttachmentPath = path.join( + attachmentsDir, + `${replacementAttachmentId}.png`, + ); + yield* fileSystem.makeDirectory(attachmentsDir, { recursive: true }); + yield* fileSystem.writeFileString(originalAttachmentPath, "original"); + yield* fileSystem.writeFileString(replacementAttachmentPath, "replacement"); + assert.isTrue(yield* exists(originalAttachmentPath)); + assert.isTrue(yield* exists(replacementAttachmentPath)); + + yield* appendAndProject({ + type: "thread.queued-follow-up-enqueued", + eventId: EventId.makeUnsafe("evt-queued-upsert-4"), + aggregateKind: "thread", + aggregateId: threadId, + occurredAt: later, + commandId: CommandId.makeUnsafe("cmd-queued-upsert-4"), + causationEventId: null, + correlationId: CorrelationId.makeUnsafe("cmd-queued-upsert-4"), + metadata: {}, + payload: { + threadId, + createdAt: later, + followUp: { + id: "queued-follow-up-upsert", + createdAt: now, + prompt: "replacement queued prompt", + attachments: [ + { + type: "image", + id: replacementAttachmentId, + name: "replacement.png", + mimeType: "image/png", + sizeBytes: 5, + }, + ], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + }, + }); + + assert.isFalse(yield* exists(originalAttachmentPath)); + assert.isTrue(yield* exists(replacementAttachmentPath)); + + const threadRows = yield* sql<{ readonly updatedAt: string }>` + SELECT updated_at AS "updatedAt" + FROM projection_threads + WHERE thread_id = ${threadId} + `; + assert.deepEqual(threadRows, [{ updatedAt: later }]); + }), + ); + it.effect("removes thread attachment directory when thread is deleted", () => Effect.gen(function* () { const fileSystem = yield* FileSystem.FileSystem; diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index 1161ff6a7d..dcedb82bbf 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -32,6 +32,10 @@ import { type ProjectionTurn, ProjectionTurnRepository, } from "../../persistence/Services/ProjectionTurns.ts"; +import { + type ProjectionThreadQueuedFollowUp, + ProjectionThreadQueuedFollowUpRepository, +} from "../../persistence/Services/ProjectionThreadQueuedFollowUps.ts"; import { ProjectionThreadRepository } from "../../persistence/Services/ProjectionThreads.ts"; import { ProjectionPendingApprovalRepositoryLive } from "../../persistence/Layers/ProjectionPendingApprovals.ts"; import { ProjectionProjectRepositoryLive } from "../../persistence/Layers/ProjectionProjects.ts"; @@ -41,6 +45,7 @@ import { ProjectionThreadMessageRepositoryLive } from "../../persistence/Layers/ import { ProjectionThreadProposedPlanRepositoryLive } from "../../persistence/Layers/ProjectionThreadProposedPlans.ts"; import { ProjectionThreadSessionRepositoryLive } from "../../persistence/Layers/ProjectionThreadSessions.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; +import { ProjectionThreadQueuedFollowUpRepositoryLive } from "../../persistence/Layers/ProjectionThreadQueuedFollowUps.ts"; import { ProjectionThreadRepositoryLive } from "../../persistence/Layers/ProjectionThreads.ts"; import { ServerConfig } from "../../config.ts"; import { @@ -61,6 +66,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = { threadProposedPlans: "projection.thread-proposed-plans", threadActivities: "projection.thread-activities", threadSessions: "projection.thread-sessions", + queuedFollowUps: "projection.thread-queued-follow-ups", threadTurns: "projection.thread-turns", checkpoints: "projection.checkpoints", pendingApprovals: "projection.pending-approvals", @@ -325,6 +331,37 @@ function collectThreadAttachmentRelativePaths( return relativePaths; } +function collectQueuedFollowUpAttachmentRelativePaths( + threadId: string, + followUps: ReadonlyArray, +): Set { + const threadSegment = toSafeThreadAttachmentSegment(threadId); + if (!threadSegment) { + return new Set(); + } + const relativePaths = new Set(); + for (const followUp of followUps) { + for (const attachment of followUp.attachments ?? []) { + if (attachment.type !== "image") { + continue; + } + const attachmentThreadSegment = parseThreadSegmentFromAttachmentId(attachment.id); + if (!attachmentThreadSegment || attachmentThreadSegment !== threadSegment) { + continue; + } + relativePaths.add(attachmentRelativePath(attachment)); + } + } + return relativePaths; +} + +function mergeThreadAttachmentRelativePaths( + messagePaths: ReadonlySet, + queuedFollowUpPaths: ReadonlySet, +): Set { + return new Set([...messagePaths, ...queuedFollowUpPaths]); +} + const runAttachmentSideEffects = Effect.fn("runAttachmentSideEffects")(function* ( sideEffects: AttachmentSideEffects, ) { @@ -455,6 +492,8 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti const projectionThreadActivityRepository = yield* ProjectionThreadActivityRepository; const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; const projectionTurnRepository = yield* ProjectionTurnRepository; + const projectionThreadQueuedFollowUpRepository = + yield* ProjectionThreadQueuedFollowUpRepository; const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; const fileSystem = yield* FileSystem.FileSystem; @@ -1236,6 +1275,214 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti } }); + const applyQueuedFollowUpsProjection: ProjectorDefinition["apply"] = Effect.fn( + "applyQueuedFollowUpsProjection", + )(function* (event, attachmentSideEffects) { + const replaceThreadQueue = ( + threadId: ProjectionThreadQueuedFollowUp["threadId"], + followUps: ReadonlyArray, + ) => + projectionThreadQueuedFollowUpRepository.replaceByThreadId({ + threadId, + followUps: followUps.map((followUp, index) => ({ + ...followUp, + queuePosition: index, + })), + }); + const syncQueuedFollowUpAttachmentPaths = Effect.fn(function* ( + threadId: ProjectionThreadQueuedFollowUp["threadId"], + followUps: ReadonlyArray, + ) { + const messageRows = yield* projectionThreadMessageRepository.listByThreadId({ + threadId, + }); + attachmentSideEffects.prunedThreadRelativePaths.set( + threadId, + mergeThreadAttachmentRelativePaths( + collectThreadAttachmentRelativePaths(threadId, messageRows), + collectQueuedFollowUpAttachmentRelativePaths(threadId, followUps), + ), + ); + }); + const touchProjectedThreadUpdatedAt = Effect.fn(function* ( + threadId: ProjectionThreadQueuedFollowUp["threadId"], + updatedAt: string, + ) { + const existingThread = yield* projectionThreadRepository.getById({ threadId }); + if (Option.isNone(existingThread)) { + return; + } + yield* projectionThreadRepository.upsert({ + ...existingThread.value, + updatedAt, + }); + }); + + switch (event.type) { + case "thread.queued-follow-up-enqueued": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const nextFollowUp: ProjectionThreadQueuedFollowUp = { + followUpId: event.payload.followUp.id, + threadId: event.payload.threadId, + queuePosition: current.length, + createdAt: event.payload.followUp.createdAt, + updatedAt: event.payload.createdAt, + prompt: event.payload.followUp.prompt, + attachments: event.payload.followUp.attachments, + terminalContexts: event.payload.followUp.terminalContexts, + modelSelection: event.payload.followUp.modelSelection, + runtimeMode: event.payload.followUp.runtimeMode, + interactionMode: event.payload.followUp.interactionMode, + lastSendError: event.payload.followUp.lastSendError, + }; + const withoutExisting = current.filter( + (followUp) => followUp.followUpId !== nextFollowUp.followUpId, + ); + const targetIndex = + event.payload.targetIndex === undefined + ? withoutExisting.length + : Math.max(0, Math.min(event.payload.targetIndex, withoutExisting.length)); + yield* replaceThreadQueue(event.payload.threadId, [ + ...withoutExisting.slice(0, targetIndex), + nextFollowUp, + ...withoutExisting.slice(targetIndex), + ]); + yield* syncQueuedFollowUpAttachmentPaths(event.payload.threadId, [ + ...withoutExisting.slice(0, targetIndex), + nextFollowUp, + ...withoutExisting.slice(targetIndex), + ]); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.queued-follow-up-updated": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const nextFollowUps = current.map((followUp) => + followUp.followUpId === event.payload.followUp.id + ? Object.assign({}, followUp, { + updatedAt: event.payload.createdAt, + prompt: event.payload.followUp.prompt, + attachments: event.payload.followUp.attachments, + terminalContexts: event.payload.followUp.terminalContexts, + modelSelection: event.payload.followUp.modelSelection, + runtimeMode: event.payload.followUp.runtimeMode, + interactionMode: event.payload.followUp.interactionMode, + lastSendError: event.payload.followUp.lastSendError, + }) + : followUp, + ); + yield* replaceThreadQueue(event.payload.threadId, nextFollowUps); + yield* syncQueuedFollowUpAttachmentPaths(event.payload.threadId, nextFollowUps); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.queued-follow-up-removed": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const nextFollowUps = current.filter( + (followUp) => followUp.followUpId !== event.payload.followUpId, + ); + yield* replaceThreadQueue(event.payload.threadId, nextFollowUps); + yield* syncQueuedFollowUpAttachmentPaths(event.payload.threadId, nextFollowUps); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.queued-follow-up-reordered": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const currentIndex = current.findIndex( + (followUp) => followUp.followUpId === event.payload.followUpId, + ); + if (currentIndex < 0) { + return; + } + const boundedTargetIndex = Math.max( + 0, + Math.min(event.payload.targetIndex, current.length - 1), + ); + const nextQueuedFollowUps = [...current]; + const [movedFollowUp] = nextQueuedFollowUps.splice(currentIndex, 1); + if (!movedFollowUp) { + return; + } + nextQueuedFollowUps.splice(boundedTargetIndex, 0, { + ...movedFollowUp, + updatedAt: event.payload.createdAt, + }); + yield* replaceThreadQueue(event.payload.threadId, nextQueuedFollowUps); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.queued-follow-up-send-failed": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + yield* replaceThreadQueue( + event.payload.threadId, + current.map((followUp) => + followUp.followUpId === event.payload.followUpId + ? Object.assign({}, followUp, { + updatedAt: event.payload.createdAt, + lastSendError: event.payload.lastSendError, + }) + : followUp, + ), + ); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.queued-follow-up-send-error-cleared": { + const current = yield* projectionThreadQueuedFollowUpRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + yield* replaceThreadQueue( + event.payload.threadId, + current.map((followUp) => + followUp.followUpId === event.payload.followUpId + ? Object.assign({}, followUp, { + updatedAt: event.payload.createdAt, + lastSendError: null, + }) + : followUp, + ), + ); + yield* touchProjectedThreadUpdatedAt(event.payload.threadId, event.occurredAt); + return; + } + + case "thread.deleted": + case "thread.reverted": { + yield* projectionThreadQueuedFollowUpRepository.deleteByThreadId({ + threadId: event.payload.threadId, + }); + if (event.type === "thread.reverted") { + const messageRows = yield* projectionThreadMessageRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + attachmentSideEffects.prunedThreadRelativePaths.set( + event.payload.threadId, + collectThreadAttachmentRelativePaths(event.payload.threadId, messageRows), + ); + } + return; + } + + default: + return; + } + }); + const applyCheckpointsProjection: ProjectorDefinition["apply"] = () => Effect.void; const applyPendingApprovalsProjection: ProjectorDefinition["apply"] = Effect.fn( @@ -1383,6 +1630,10 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti name: ORCHESTRATION_PROJECTOR_NAMES.threadSessions, apply: applyThreadSessionsProjection, }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.queuedFollowUps, + apply: applyQueuedFollowUpsProjection, + }, { name: ORCHESTRATION_PROJECTOR_NAMES.threadTurns, apply: applyThreadTurnsProjection, @@ -1499,6 +1750,7 @@ export const OrchestrationProjectionPipelineLive = Layer.effect( Layer.provideMerge(ProjectionThreadProposedPlanRepositoryLive), Layer.provideMerge(ProjectionThreadActivityRepositoryLive), Layer.provideMerge(ProjectionThreadSessionRepositoryLive), + Layer.provideMerge(ProjectionThreadQueuedFollowUpRepositoryLive), Layer.provideMerge(ProjectionTurnRepositoryLive), Layer.provideMerge(ProjectionPendingApprovalRepositoryLive), Layer.provideMerge(ProjectionStateRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index 7db2a23e5e..a2f16ddc3e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -43,6 +43,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { yield* sql`DELETE FROM projection_projects`; yield* sql`DELETE FROM projection_state`; yield* sql`DELETE FROM projection_thread_proposed_plans`; + yield* sql`DELETE FROM projection_thread_queued_follow_ups`; yield* sql`DELETE FROM projection_turns`; yield* sql` @@ -176,6 +177,37 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { ) `; + yield* sql` + INSERT INTO projection_thread_queued_follow_ups ( + follow_up_id, + thread_id, + queue_position, + created_at, + updated_at, + prompt, + attachments_json, + terminal_contexts_json, + model_selection_json, + runtime_mode, + interaction_mode, + last_send_error + ) + VALUES ( + 'queued-follow-up-1', + 'thread-1', + 0, + '2026-02-24T00:00:06.250Z', + '2026-02-24T00:00:06.250Z', + 'follow up after this turn', + '[]', + '[]', + '{"provider":"codex","model":"gpt-5-codex"}', + 'full-access', + 'default', + NULL + ) + `; + yield* sql` INSERT INTO projection_thread_sessions ( thread_id, @@ -332,6 +364,23 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { updatedAt: "2026-02-24T00:00:05.500Z", }, ], + queuedFollowUps: [ + { + id: "queued-follow-up-1", + createdAt: "2026-02-24T00:00:06.250Z", + prompt: "follow up after this turn", + attachments: [], + terminalContexts: [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + ], activities: [ { id: asEventId("activity-1"), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 9b3c0fa7ad..4b3d54a89e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -1186,6 +1186,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { deletedAt: row.deletedAt, messages: messagesByThread.get(row.threadId) ?? [], proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + queuedFollowUps: [], activities: activitiesByThread.get(row.threadId) ?? [], checkpoints: checkpointsByThread.get(row.threadId) ?? [], session: sessionsByThread.get(row.threadId) ?? null, @@ -1384,6 +1385,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { deletedAt: row.deletedAt, messages: [], proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], + queuedFollowUps: [], activities: [], checkpoints: [], session: sessionByThread.get(row.threadId) ?? null, diff --git a/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.test.ts b/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.test.ts new file mode 100644 index 0000000000..6846b4ea6d --- /dev/null +++ b/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.test.ts @@ -0,0 +1,514 @@ +import { + CommandId, + DEFAULT_PROVIDER_INTERACTION_MODE, + DEFAULT_RUNTIME_MODE, + EventId, + type OrchestrationEvent, + type OrchestrationThreadActivity, + type OrchestrationSessionStatus, + ProjectId, + ThreadId, + type OrchestrationCommand, + type OrchestrationReadModel, + ProviderInstanceId, +} from "@t3tools/contracts"; +import { Effect, Exit, Layer, ManagedRuntime, Scope, Stream } from "effect"; +import { afterEach, describe, expect, it } from "vitest"; + +import { + OrchestrationEngineService, + type OrchestrationEngineShape, +} from "../Services/OrchestrationEngine.ts"; +import { QueuedFollowUpReactor } from "../Services/QueuedFollowUpReactor.ts"; +import { QueuedFollowUpReactorLive } from "./QueuedFollowUpReactor.ts"; + +const NOW_ISO = "2026-03-28T12:00:00.000Z"; + +function makeReadModel(input?: { + sessionStatus?: OrchestrationSessionStatus | null; + lastSendError?: string | null; + queuedPrompts?: ReadonlyArray; + queuedAttachments?: ReadonlyArray< + OrchestrationReadModel["threads"][number]["queuedFollowUps"][number]["attachments"] + >; + queuedTerminalContexts?: ReadonlyArray< + OrchestrationReadModel["threads"][number]["queuedFollowUps"][number]["terminalContexts"] + >; + latestTurnState?: OrchestrationReadModel["threads"][number]["latestTurn"]; + activities?: ReadonlyArray; +}): OrchestrationReadModel { + const queuedPrompts = input?.queuedPrompts ?? ["send this next"]; + return { + snapshotSequence: 1, + updatedAt: NOW_ISO, + projects: [ + { + id: ProjectId.makeUnsafe("project-1"), + title: "Project", + workspaceRoot: "/tmp/project", + defaultModelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + scripts: [], + createdAt: NOW_ISO, + updatedAt: NOW_ISO, + deletedAt: null, + }, + ], + threads: [ + { + id: ThreadId.makeUnsafe("thread-1"), + projectId: ProjectId.makeUnsafe("project-1"), + title: "Thread", + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + runtimeMode: DEFAULT_RUNTIME_MODE, + branch: null, + worktreePath: null, + createdAt: NOW_ISO, + updatedAt: NOW_ISO, + archivedAt: null, + deletedAt: null, + messages: [], + queuedFollowUps: queuedPrompts.map((prompt, index) => ({ + id: `follow-up-${index + 1}`, + createdAt: NOW_ISO, + prompt, + attachments: input?.queuedAttachments?.[index] ?? [], + terminalContexts: input?.queuedTerminalContexts?.[index] ?? [], + modelSelection: { + instanceId: ProviderInstanceId.make("codex"), + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: DEFAULT_RUNTIME_MODE, + interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE, + lastSendError: input?.lastSendError ?? null, + })), + proposedPlans: [], + activities: [...(input?.activities ?? [])], + checkpoints: [], + latestTurn: input?.latestTurnState ?? null, + session: + input?.sessionStatus === null + ? null + : { + threadId: ThreadId.makeUnsafe("thread-1"), + status: input?.sessionStatus ?? "ready", + providerName: "codex", + runtimeMode: DEFAULT_RUNTIME_MODE, + activeTurnId: null, + lastError: null, + updatedAt: NOW_ISO, + }, + }, + ], + }; +} + +describe("QueuedFollowUpReactor", () => { + let runtime: ManagedRuntime.ManagedRuntime | null = null; + + afterEach(async () => { + if (runtime) { + await runtime.dispose(); + } + runtime = null; + }); + + it("dispatches the queued head and removes it when the thread is sendable", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => Effect.succeed(makeReadModel()), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched.map((command) => command.type)).toEqual([ + "thread.turn.start", + "thread.queued-follow-up.remove", + ]); + const turnStart = dispatched[0]; + expect(turnStart?.type).toBe("thread.turn.start"); + if (turnStart?.type !== "thread.turn.start") { + throw new Error("Expected first command to be thread.turn.start"); + } + expect(turnStart.message.text).toBe("send this next"); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("injects queued terminal contexts into the dispatched prompt", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => + Effect.succeed( + makeReadModel({ + queuedPrompts: ["Investigate this"], + queuedTerminalContexts: [ + [ + { + id: "ctx-1", + threadId: ThreadId.makeUnsafe("thread-1"), + createdAt: NOW_ISO, + terminalId: "default", + terminalLabel: "Terminal 1", + lineStart: 3, + lineEnd: 6, + text: "\n\nalpha\nbeta", + }, + ], + ], + }), + ), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + const turnStart = dispatched[0]; + expect(turnStart?.type).toBe("thread.turn.start"); + if (turnStart?.type !== "thread.turn.start") { + throw new Error("Expected first command to be thread.turn.start"); + } + expect(turnStart.message.text).toContain("Investigate this"); + expect(turnStart.message.text).toContain(""); + expect(turnStart.message.text).toContain("- Terminal 1 lines 3-6:"); + expect(turnStart.message.text).toContain("5 | alpha"); + expect(turnStart.message.text).toContain("6 | beta"); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("uses the image-only fallback prompt for queued image-only sends", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => + Effect.succeed( + makeReadModel({ + queuedPrompts: [""], + queuedAttachments: [ + [ + { + type: "image", + id: "thread-1-att-1", + name: "queued.png", + mimeType: "image/png", + sizeBytes: 128, + }, + ], + ], + }), + ), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + const turnStart = dispatched[0]; + expect(turnStart?.type).toBe("thread.turn.start"); + if (turnStart?.type !== "thread.turn.start") { + throw new Error("Expected first command to be thread.turn.start"); + } + expect(turnStart.message.text).toBe( + "[User attached one or more images without additional text. Respond using the conversation context and the attached image(s).]", + ); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("records a send failure and keeps the queued item when dispatch fails", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => Effect.succeed(makeReadModel()), + readEvents: () => Stream.empty, + dispatch: (command) => { + dispatched.push(command); + if (command.type === "thread.turn.start") { + return Effect.fail({ _tag: "InvalidCommand" } as never); + } + return Effect.succeed({ sequence: dispatched.length }); + }, + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched.map((command) => command.type)).toEqual([ + "thread.turn.start", + "thread.queued-follow-up.send-failed", + ]); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("does not dispatch while the thread session is still running", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => Effect.succeed(makeReadModel({ sessionStatus: "running" })), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched).toEqual([]); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("does not dispatch while a pending approval is open", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => + Effect.succeed( + makeReadModel({ + activities: [ + { + id: EventId.makeUnsafe("activity-approval-open"), + kind: "approval.requested", + tone: "info", + summary: "Approval required", + turnId: null, + createdAt: NOW_ISO, + payload: { + requestId: "approval-request-1", + requestKind: "command", + }, + }, + ], + }), + ), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched).toEqual([]); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("does not dispatch while a pending user-input request is open", async () => { + const dispatched: OrchestrationCommand[] = []; + const engine: OrchestrationEngineShape = { + getReadModel: () => + Effect.succeed( + makeReadModel({ + activities: [ + { + id: EventId.makeUnsafe("activity-user-input-open"), + kind: "user-input.requested", + tone: "info", + summary: "Need more input", + turnId: null, + createdAt: NOW_ISO, + payload: { + requestId: "user-input-request-1", + questions: [ + { + id: "question-1", + header: "Pick one", + question: "Which option?", + options: [ + { + label: "A", + description: "Option A", + }, + ], + }, + ], + }, + }, + ], + }), + ), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.empty, + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched).toEqual([]); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); + + it("does not dispatch the rest of the queue before the previous queued send settles", async () => { + const dispatched: OrchestrationCommand[] = []; + let readModel = makeReadModel({ + queuedPrompts: ["first", "second", "third"], + }); + const threadEvent = { + eventId: EventId.makeUnsafe("evt-queued-follow-up-reactor"), + sequence: 1, + type: "thread.queued-follow-up-enqueued", + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-1"), + occurredAt: NOW_ISO, + commandId: CommandId.makeUnsafe("cmd-queued-follow-up-reactor"), + causationEventId: null, + correlationId: "corr-queued-follow-up-reactor", + payload: { + createdAt: NOW_ISO, + threadId: ThreadId.makeUnsafe("thread-1"), + followUp: readModel.threads[0]!.queuedFollowUps[0]!, + }, + metadata: {}, + } as unknown as OrchestrationEvent; + const engine: OrchestrationEngineShape = { + getReadModel: () => Effect.succeed(readModel), + readEvents: () => Stream.empty, + dispatch: (command) => + Effect.sync(() => { + dispatched.push(command); + if (command.type === "thread.queued-follow-up.remove") { + const nextQueuedFollowUps = readModel.threads[0]!.queuedFollowUps.filter( + (followUp) => followUp.id !== command.followUpId, + ); + readModel = { + ...readModel, + threads: [ + { + ...readModel.threads[0]!, + queuedFollowUps: nextQueuedFollowUps, + }, + ], + }; + } + return { sequence: dispatched.length }; + }), + streamDomainEvents: Stream.fromIterable([threadEvent, threadEvent]), + }; + + runtime = ManagedRuntime.make( + QueuedFollowUpReactorLive.pipe( + Layer.provide(Layer.succeed(OrchestrationEngineService, engine)), + ), + ); + + const reactor = await runtime.runPromise(Effect.service(QueuedFollowUpReactor)); + const scope = await Effect.runPromise(Scope.make("sequential")); + + await Effect.runPromise(reactor.start.pipe(Scope.provide(scope))); + await runtime.runPromise(reactor.drain); + + expect(dispatched.map((command) => command.type)).toEqual([ + "thread.turn.start", + "thread.queued-follow-up.remove", + ]); + + await Effect.runPromise(Scope.close(scope, Exit.void)); + }); +}); diff --git a/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.ts b/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.ts new file mode 100644 index 0000000000..83e62b00e7 --- /dev/null +++ b/apps/server/src/orchestration/Layers/QueuedFollowUpReactor.ts @@ -0,0 +1,212 @@ +import { CommandId, MessageId, type OrchestrationEvent, type ThreadId } from "@t3tools/contracts"; +import { makeDrainableWorker } from "@t3tools/shared/DrainableWorker"; +import { + buildQueuedFollowUpMessageText, + canDispatchQueuedFollowUp, +} from "@t3tools/shared/orchestration"; +import * as Cause from "effect/Cause"; +import * as Effect from "effect/Effect"; +import * as Exit from "effect/Exit"; +import * as Layer from "effect/Layer"; +import * as Random from "effect/Random"; +import * as Stream from "effect/Stream"; + +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { + QueuedFollowUpReactor, + type QueuedFollowUpReactorShape, +} from "../Services/QueuedFollowUpReactor.ts"; + +const serverCommandId = (tag: string): CommandId => + CommandId.make(`server:${tag}:${crypto.randomUUID()}`); + +const make = Effect.gen(function* () { + const orchestrationEngine = yield* OrchestrationEngineService; + const inFlightFollowUpIds = new Set(); + const pendingQueuedDispatchByThreadId = new Map(); + const getReadModel = + orchestrationEngine.getReadModel ?? (() => Effect.die("getReadModel unavailable")); + + const hasQueuedDispatchSettled = Effect.fnUntraced(function* (threadId: ThreadId) { + const dispatchedAt = pendingQueuedDispatchByThreadId.get(threadId); + if (!dispatchedAt) { + return true; + } + const readModel = yield* getReadModel(); + const thread = readModel.threads.find( + (entry) => entry.id === threadId && entry.deletedAt === null, + ); + if (!thread) { + return true; + } + if (thread.session?.status === "starting" || thread.session?.status === "running") { + return false; + } + if (thread.latestTurn && thread.latestTurn.requestedAt >= dispatchedAt) { + return thread.latestTurn.completedAt !== null; + } + return thread.activities.some( + (activity) => + activity.createdAt >= dispatchedAt && activity.kind === "provider.turn.start.failed", + ); + }); + + const processThread = Effect.fnUntraced(function* (threadId: ThreadId) { + const readModel = yield* getReadModel(); + const thread = readModel.threads.find( + (entry) => entry.id === threadId && entry.deletedAt === null, + ); + if (!thread) { + pendingQueuedDispatchByThreadId.delete(threadId); + return; + } + if (pendingQueuedDispatchByThreadId.has(threadId)) { + const settled = yield* hasQueuedDispatchSettled(threadId); + if (!settled) { + return; + } + pendingQueuedDispatchByThreadId.delete(threadId); + } + const queuedHead = thread.queuedFollowUps[0]; + if (!queuedHead) { + return; + } + if ( + !canDispatchQueuedFollowUp({ + session: thread.session, + activities: thread.activities, + queuedFollowUpCount: thread.queuedFollowUps.length, + queuedHeadHasError: queuedHead.lastSendError !== null, + }) + ) { + return; + } + if (inFlightFollowUpIds.has(queuedHead.id)) { + return; + } + + inFlightFollowUpIds.add(queuedHead.id); + yield* Effect.gen(function* () { + const turnStartCreatedAt = new Date().toISOString(); + const turnStartExit = yield* Effect.exit( + orchestrationEngine.dispatch({ + type: "thread.turn.start", + commandId: serverCommandId("queued-follow-up-turn-start"), + threadId, + message: { + messageId: MessageId.make(yield* Random.nextUUIDv4), + role: "user", + text: buildQueuedFollowUpMessageText({ + prompt: queuedHead.prompt, + terminalContexts: queuedHead.terminalContexts, + attachmentCount: queuedHead.attachments.length, + }), + attachments: queuedHead.attachments, + }, + modelSelection: queuedHead.modelSelection, + runtimeMode: queuedHead.runtimeMode, + interactionMode: queuedHead.interactionMode, + createdAt: turnStartCreatedAt, + }), + ); + + if (Exit.isFailure(turnStartExit)) { + yield* orchestrationEngine + .dispatch({ + type: "thread.queued-follow-up.send-failed", + commandId: serverCommandId("queued-follow-up-send-failed"), + threadId, + followUpId: queuedHead.id, + lastSendError: Cause.pretty(turnStartExit.cause), + createdAt: new Date().toISOString(), + }) + .pipe( + Effect.catchCause((nestedCause) => + Effect.logWarning("queued follow-up reactor failed to persist send failure", { + threadId, + followUpId: queuedHead.id, + cause: Cause.pretty(nestedCause), + }), + ), + ); + return; + } + + pendingQueuedDispatchByThreadId.set(threadId, turnStartCreatedAt); + const removeExit = yield* Effect.exit( + orchestrationEngine.dispatch({ + type: "thread.queued-follow-up.remove", + commandId: serverCommandId("queued-follow-up-remove"), + threadId, + followUpId: queuedHead.id, + createdAt: new Date().toISOString(), + }), + ); + + if (Exit.isFailure(removeExit)) { + yield* orchestrationEngine + .dispatch({ + type: "thread.queued-follow-up.send-failed", + commandId: serverCommandId("queued-follow-up-send-failed"), + threadId, + followUpId: queuedHead.id, + lastSendError: "Queued follow-up was sent but queue cleanup failed.", + createdAt: new Date().toISOString(), + }) + .pipe( + Effect.catchCause((nestedCause) => + Effect.logWarning("queued follow-up reactor failed to persist send failure", { + threadId, + followUpId: queuedHead.id, + cause: Cause.pretty(nestedCause), + }), + ), + ); + } + }).pipe(Effect.ensuring(Effect.sync(() => inFlightFollowUpIds.delete(queuedHead.id)))); + }); + + const worker = yield* makeDrainableWorker((threadId: ThreadId) => + processThread(threadId).pipe( + Effect.catchCause((cause) => { + if (Cause.hasInterruptsOnly(cause)) { + return Effect.failCause(cause); + } + return Effect.logWarning("queued follow-up reactor failed to process thread", { + threadId, + cause: Cause.pretty(cause), + }); + }), + ), + ); + + const enqueueThread = (threadId: ThreadId) => worker.enqueue(threadId); + + const start: QueuedFollowUpReactorShape["start"] = Effect.gen(function* () { + const snapshot = yield* getReadModel(); + yield* Effect.forEach( + snapshot.threads, + (thread) => + thread.deletedAt === null && thread.queuedFollowUps.length > 0 + ? enqueueThread(thread.id) + : Effect.void, + { concurrency: 1 }, + ); + + yield* Effect.forkScoped( + Stream.runForEach(orchestrationEngine.streamDomainEvents, (event: OrchestrationEvent) => { + if (event.aggregateKind !== "thread") { + return Effect.void; + } + return enqueueThread(event.aggregateId as ThreadId); + }), + ); + }).pipe(Effect.asVoid); + + return { + start, + drain: worker.drain, + } satisfies QueuedFollowUpReactorShape; +}); + +export const QueuedFollowUpReactorLive = Layer.effect(QueuedFollowUpReactor, make); diff --git a/apps/server/src/orchestration/Normalizer.ts b/apps/server/src/orchestration/Normalizer.ts index 95d29e3d6d..8008dab668 100644 --- a/apps/server/src/orchestration/Normalizer.ts +++ b/apps/server/src/orchestration/Normalizer.ts @@ -73,6 +73,22 @@ export const normalizeDispatchCommand = (command: ClientOrchestrationCommand) => command.message.attachments, (attachment) => Effect.gen(function* () { + if (!("dataUrl" in attachment)) { + const persistedAttachment = attachment as { + type: "image"; + id: string; + name: string; + mimeType: string; + sizeBytes: number; + }; + return { + type: "image" as const, + id: persistedAttachment.id, + name: persistedAttachment.name, + mimeType: persistedAttachment.mimeType, + sizeBytes: persistedAttachment.sizeBytes, + }; + } const parsed = parseBase64DataUrl(attachment.dataUrl); if (!parsed || !parsed.mimeType.startsWith("image/")) { return yield* new OrchestrationDispatchCommandError({ diff --git a/apps/server/src/orchestration/Services/OrchestrationEngine.ts b/apps/server/src/orchestration/Services/OrchestrationEngine.ts index acb2b7b042..51264d3fe7 100644 --- a/apps/server/src/orchestration/Services/OrchestrationEngine.ts +++ b/apps/server/src/orchestration/Services/OrchestrationEngine.ts @@ -10,7 +10,11 @@ * * @module OrchestrationEngineService */ -import type { OrchestrationCommand, OrchestrationEvent } from "@t3tools/contracts"; +import type { + OrchestrationCommand, + OrchestrationEvent, + OrchestrationReadModel, +} from "@t3tools/contracts"; import * as Context from "effect/Context"; import type * as Effect from "effect/Effect"; import type * as Stream from "effect/Stream"; @@ -45,6 +49,8 @@ export interface OrchestrationEngineShape { command: OrchestrationCommand, ) => Effect.Effect<{ sequence: number }, OrchestrationDispatchError, never>; + readonly getReadModel?: () => Effect.Effect; + /** * Stream persisted domain events in dispatch order. * diff --git a/apps/server/src/orchestration/Services/QueuedFollowUpReactor.ts b/apps/server/src/orchestration/Services/QueuedFollowUpReactor.ts new file mode 100644 index 0000000000..23e31208aa --- /dev/null +++ b/apps/server/src/orchestration/Services/QueuedFollowUpReactor.ts @@ -0,0 +1,13 @@ +import * as Context from "effect/Context"; +import type * as Effect from "effect/Effect"; +import type * as Scope from "effect/Scope"; + +export interface QueuedFollowUpReactorShape { + readonly start: Effect.Effect; + readonly drain: Effect.Effect; +} + +export class QueuedFollowUpReactor extends Context.Service< + QueuedFollowUpReactor, + QueuedFollowUpReactorShape +>()("t3/orchestration/Services/QueuedFollowUpReactor") {} diff --git a/apps/server/src/orchestration/commandInvariants.test.ts b/apps/server/src/orchestration/commandInvariants.test.ts index d52f0535fb..96567d1d1f 100644 --- a/apps/server/src/orchestration/commandInvariants.test.ts +++ b/apps/server/src/orchestration/commandInvariants.test.ts @@ -70,6 +70,7 @@ const readModel: OrchestrationReadModel = { archivedAt: null, latestTurn: null, messages: [], + queuedFollowUps: [], session: null, activities: [], proposedPlans: [], @@ -93,6 +94,7 @@ const readModel: OrchestrationReadModel = { archivedAt: null, latestTurn: null, messages: [], + queuedFollowUps: [], session: null, activities: [], proposedPlans: [], diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 1004c945db..a709b3c2c2 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -462,6 +462,123 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.queued-follow-up.enqueue": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-enqueued", + payload: { + threadId: command.threadId, + followUp: command.followUp, + ...(command.targetIndex !== undefined ? { targetIndex: command.targetIndex } : {}), + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-follow-up.update": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const existingFollowUp = thread.queuedFollowUps.find( + (followUp) => followUp.id === command.followUp.id, + ); + if (!existingFollowUp) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued follow-up '${command.followUp.id}' does not exist on thread '${command.threadId}'.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-updated", + payload: { + threadId: command.threadId, + followUp: command.followUp, + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-follow-up.remove": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const existingFollowUp = thread.queuedFollowUps.find( + (followUp) => followUp.id === command.followUpId, + ); + if (!existingFollowUp) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued follow-up '${command.followUpId}' does not exist on thread '${command.threadId}'.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-removed", + payload: { + threadId: command.threadId, + followUpId: command.followUpId, + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-follow-up.reorder": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const existingIndex = thread.queuedFollowUps.findIndex( + (followUp) => followUp.id === command.followUpId, + ); + if (existingIndex < 0) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued follow-up '${command.followUpId}' does not exist on thread '${command.threadId}'.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-reordered", + payload: { + threadId: command.threadId, + followUpId: command.followUpId, + targetIndex: command.targetIndex, + createdAt: command.createdAt, + }, + }; + } + case "thread.approval.respond": { yield* requireThread({ readModel, @@ -732,6 +849,69 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.queued-follow-up.send-failed": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const existingFollowUp = thread.queuedFollowUps.find( + (followUp) => followUp.id === command.followUpId, + ); + if (!existingFollowUp) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued follow-up '${command.followUpId}' does not exist on thread '${command.threadId}'.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-send-failed", + payload: { + threadId: command.threadId, + followUpId: command.followUpId, + lastSendError: command.lastSendError, + createdAt: command.createdAt, + }, + }; + } + + case "thread.queued-follow-up.send-error-cleared": { + const thread = yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + const existingFollowUp = thread.queuedFollowUps.find( + (followUp) => followUp.id === command.followUpId, + ); + if (!existingFollowUp) { + return yield* new OrchestrationCommandInvariantError({ + commandType: command.type, + detail: `Queued follow-up '${command.followUpId}' does not exist on thread '${command.threadId}'.`, + }); + } + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.queued-follow-up-send-error-cleared", + payload: { + threadId: command.threadId, + followUpId: command.followUpId, + createdAt: command.createdAt, + }, + }; + } + default: { command satisfies never; const fallback = command as never as { type: string }; diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 01dcb9abea..e12faff61f 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -92,6 +92,7 @@ describe("orchestration projector", () => { deletedAt: null, messages: [], proposedPlans: [], + queuedFollowUps: [], activities: [], checkpoints: [], session: null, @@ -133,6 +134,177 @@ describe("orchestration projector", () => { ).rejects.toBeDefined(); }); + it("tracks queued follow-ups in the in-memory thread snapshot", async () => { + const createdAt = "2026-03-28T12:00:00.000Z"; + const queuedAt = "2026-03-28T12:00:05.000Z"; + const model = createEmptyReadModel(createdAt); + + const afterCreate = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-queue", + occurredAt: createdAt, + commandId: "cmd-create-queue", + payload: { + threadId: "thread-queue", + projectId: "project-1", + title: "queue demo", + modelSelection: { + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }), + ), + ); + + const afterQueue = await Effect.runPromise( + projectEvent( + afterCreate, + makeEvent({ + sequence: 2, + type: "thread.queued-follow-up-enqueued", + aggregateKind: "thread", + aggregateId: "thread-queue", + occurredAt: queuedAt, + commandId: "cmd-queue-enqueue", + payload: { + threadId: "thread-queue", + followUp: { + id: "follow-up-1", + createdAt: queuedAt, + prompt: "queue this next", + attachments: [], + terminalContexts: [], + modelSelection: { + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + createdAt: queuedAt, + }, + }), + ), + ); + + expect(afterQueue.threads[0]?.queuedFollowUps).toEqual([ + { + id: "follow-up-1", + createdAt: queuedAt, + prompt: "queue this next", + attachments: [], + terminalContexts: [], + modelSelection: { + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + ]); + }); + + it("clears queued follow-ups from the in-memory snapshot on thread.reverted", async () => { + const createdAt = "2026-03-28T12:00:00.000Z"; + const queuedAt = "2026-03-28T12:00:05.000Z"; + const revertedAt = "2026-03-28T12:00:10.000Z"; + const model = createEmptyReadModel(createdAt); + + const afterCreate = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-queue-revert", + occurredAt: createdAt, + commandId: "cmd-create-queue-revert", + payload: { + threadId: "thread-queue-revert", + projectId: "project-1", + title: "queue revert demo", + modelSelection: { + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt, + updatedAt: createdAt, + }, + }), + ), + ); + + const afterQueue = await Effect.runPromise( + projectEvent( + afterCreate, + makeEvent({ + sequence: 2, + type: "thread.queued-follow-up-enqueued", + aggregateKind: "thread", + aggregateId: "thread-queue-revert", + occurredAt: queuedAt, + commandId: "cmd-queue-revert-enqueue", + payload: { + threadId: "thread-queue-revert", + followUp: { + id: "follow-up-revert-1", + createdAt: queuedAt, + prompt: "queue this before revert", + attachments: [], + terminalContexts: [], + modelSelection: { + provider: "codex", + model: "gpt-5.3-codex", + }, + runtimeMode: "full-access", + interactionMode: "default", + lastSendError: null, + }, + createdAt: queuedAt, + }, + }), + ), + ); + + const afterRevert = await Effect.runPromise( + projectEvent( + afterQueue, + makeEvent({ + sequence: 3, + type: "thread.reverted", + aggregateKind: "thread", + aggregateId: "thread-queue-revert", + occurredAt: revertedAt, + commandId: "cmd-queue-revert", + payload: { + threadId: "thread-queue-revert", + turnCount: 0, + }, + }), + ), + ); + + expect(afterRevert.threads[0]?.queuedFollowUps).toEqual([]); + expect(afterRevert.threads[0]?.updatedAt).toBe(revertedAt); + }); + it("applies thread.archived and thread.unarchived events", async () => { const now = "2026-01-01T00:00:00.000Z"; const later = "2026-01-01T00:00:01.000Z"; diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 0c92f96543..40f11145f9 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -4,6 +4,12 @@ import { OrchestrationMessage, OrchestrationSession, OrchestrationThread, + ThreadQueuedFollowUpEnqueuedPayload, + ThreadQueuedFollowUpRemovedPayload, + ThreadQueuedFollowUpReorderedPayload, + ThreadQueuedFollowUpSendErrorClearedPayload, + ThreadQueuedFollowUpSendFailedPayload, + ThreadQueuedFollowUpUpdatedPayload, } from "@t3tools/contracts"; import * as Effect from "effect/Effect"; import * as Schema from "effect/Schema"; @@ -265,6 +271,7 @@ export function projectEvent( archivedAt: null, deletedAt: null, messages: [], + queuedFollowUps: [], activities: [], checkpoints: [], session: null, @@ -418,6 +425,180 @@ export function projectEvent( }; }); + case "thread.queued-follow-up-enqueued": + return decodeForEvent( + ThreadQueuedFollowUpEnqueuedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + const existingWithoutFollowUp = thread.queuedFollowUps.filter( + (followUp) => followUp.id !== payload.followUp.id, + ); + const targetIndex = + payload.targetIndex === undefined + ? existingWithoutFollowUp.length + : Math.max(0, Math.min(payload.targetIndex, existingWithoutFollowUp.length)); + return { + ...thread, + queuedFollowUps: [ + ...existingWithoutFollowUp.slice(0, targetIndex), + payload.followUp, + ...existingWithoutFollowUp.slice(targetIndex), + ], + updatedAt: event.occurredAt, + }; + }), + })), + ); + + case "thread.queued-follow-up-updated": + return decodeForEvent( + ThreadQueuedFollowUpUpdatedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + return { + ...thread, + queuedFollowUps: thread.queuedFollowUps.map((followUp) => + followUp.id === payload.followUp.id ? payload.followUp : followUp, + ), + updatedAt: event.occurredAt, + }; + }), + })), + ); + + case "thread.queued-follow-up-removed": + return decodeForEvent( + ThreadQueuedFollowUpRemovedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + return { + ...thread, + queuedFollowUps: thread.queuedFollowUps.filter( + (followUp) => followUp.id !== payload.followUpId, + ), + updatedAt: event.occurredAt, + }; + }), + })), + ); + + case "thread.queued-follow-up-reordered": + return decodeForEvent( + ThreadQueuedFollowUpReorderedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + const currentIndex = thread.queuedFollowUps.findIndex( + (followUp) => followUp.id === payload.followUpId, + ); + if (currentIndex < 0) { + return thread; + } + const boundedTargetIndex = Math.max( + 0, + Math.min(payload.targetIndex, thread.queuedFollowUps.length - 1), + ); + if (boundedTargetIndex === currentIndex) { + return thread; + } + const nextQueuedFollowUps = [...thread.queuedFollowUps]; + const [movedFollowUp] = nextQueuedFollowUps.splice(currentIndex, 1); + if (!movedFollowUp) { + return thread; + } + nextQueuedFollowUps.splice(boundedTargetIndex, 0, movedFollowUp); + return { + ...thread, + queuedFollowUps: nextQueuedFollowUps, + updatedAt: event.occurredAt, + }; + }), + })), + ); + + case "thread.queued-follow-up-send-failed": + return decodeForEvent( + ThreadQueuedFollowUpSendFailedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + return { + ...thread, + queuedFollowUps: thread.queuedFollowUps.map((followUp) => + followUp.id === payload.followUpId + ? { ...followUp, lastSendError: payload.lastSendError } + : followUp, + ), + updatedAt: event.occurredAt, + }; + }), + })), + ); + + case "thread.queued-follow-up-send-error-cleared": + return decodeForEvent( + ThreadQueuedFollowUpSendErrorClearedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => ({ + ...nextBase, + threads: nextBase.threads.map((thread) => { + if (thread.id !== payload.threadId) { + return thread; + } + return { + ...thread, + queuedFollowUps: thread.queuedFollowUps.map((followUp) => + followUp.id === payload.followUpId + ? { ...followUp, lastSendError: null } + : followUp, + ), + updatedAt: event.occurredAt, + }; + }), + })), + ); + case "thread.session-set": return Effect.gen(function* () { const payload = yield* decodeForEvent( @@ -609,6 +790,7 @@ export function projectEvent( threads: updateThread(nextBase.threads, payload.threadId, { checkpoints, messages, + queuedFollowUps: [], proposedPlans, activities, latestTurn, diff --git a/apps/server/src/persistence/Layers/ProjectionThreadQueuedFollowUps.ts b/apps/server/src/persistence/Layers/ProjectionThreadQueuedFollowUps.ts new file mode 100644 index 0000000000..4028853633 --- /dev/null +++ b/apps/server/src/persistence/Layers/ProjectionThreadQueuedFollowUps.ts @@ -0,0 +1,181 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Schema from "effect/Schema"; +import * as Struct from "effect/Struct"; +import { + ChatAttachment, + ModelSelection, + OrchestrationQueuedTerminalContext, +} from "@t3tools/contracts"; + +import { toPersistenceSqlError } from "../Errors.ts"; +import { + DeleteProjectionThreadQueuedFollowUpsInput, + GetProjectionThreadQueuedFollowUpInput, + ListProjectionThreadQueuedFollowUpsInput, + ProjectionThreadQueuedFollowUp, + ProjectionThreadQueuedFollowUpRepository, + type ProjectionThreadQueuedFollowUpRepositoryShape, +} from "../Services/ProjectionThreadQueuedFollowUps.ts"; + +const ProjectionThreadQueuedFollowUpDbRowSchema = ProjectionThreadQueuedFollowUp.mapFields( + Struct.assign({ + attachments: Schema.fromJsonString(Schema.Array(ChatAttachment)), + terminalContexts: Schema.fromJsonString(Schema.Array(OrchestrationQueuedTerminalContext)), + modelSelection: Schema.fromJsonString(ModelSelection), + }), +); + +const makeProjectionThreadQueuedFollowUpRepository = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const listQueuedFollowUpRows = SqlSchema.findAll({ + Request: ListProjectionThreadQueuedFollowUpsInput, + Result: ProjectionThreadQueuedFollowUpDbRowSchema, + execute: ({ threadId }) => + sql` + SELECT + follow_up_id AS "followUpId", + thread_id AS "threadId", + queue_position AS "queuePosition", + created_at AS "createdAt", + updated_at AS "updatedAt", + prompt, + attachments_json AS "attachments", + terminal_contexts_json AS "terminalContexts", + model_selection_json AS "modelSelection", + runtime_mode AS "runtimeMode", + interaction_mode AS "interactionMode", + last_send_error AS "lastSendError" + FROM projection_thread_queued_follow_ups + WHERE thread_id = ${threadId} + ORDER BY queue_position ASC, created_at ASC, follow_up_id ASC + `, + }); + + const getQueuedFollowUpRow = SqlSchema.findOneOption({ + Request: GetProjectionThreadQueuedFollowUpInput, + Result: ProjectionThreadQueuedFollowUpDbRowSchema, + execute: ({ followUpId }) => + sql` + SELECT + follow_up_id AS "followUpId", + thread_id AS "threadId", + queue_position AS "queuePosition", + created_at AS "createdAt", + updated_at AS "updatedAt", + prompt, + attachments_json AS "attachments", + terminal_contexts_json AS "terminalContexts", + model_selection_json AS "modelSelection", + runtime_mode AS "runtimeMode", + interaction_mode AS "interactionMode", + last_send_error AS "lastSendError" + FROM projection_thread_queued_follow_ups + WHERE follow_up_id = ${followUpId} + `, + }); + + const deleteQueuedFollowUpsByThreadId = SqlSchema.void({ + Request: DeleteProjectionThreadQueuedFollowUpsInput, + execute: ({ threadId }) => + sql` + DELETE FROM projection_thread_queued_follow_ups + WHERE thread_id = ${threadId} + `, + }); + + const insertQueuedFollowUpRow = SqlSchema.void({ + Request: ProjectionThreadQueuedFollowUp, + execute: (row) => + sql` + INSERT INTO projection_thread_queued_follow_ups ( + follow_up_id, + thread_id, + queue_position, + created_at, + updated_at, + prompt, + attachments_json, + terminal_contexts_json, + model_selection_json, + runtime_mode, + interaction_mode, + last_send_error + ) + VALUES ( + ${row.followUpId}, + ${row.threadId}, + ${row.queuePosition}, + ${row.createdAt}, + ${row.updatedAt}, + ${row.prompt}, + ${JSON.stringify(row.attachments)}, + ${JSON.stringify(row.terminalContexts)}, + ${JSON.stringify(row.modelSelection)}, + ${row.runtimeMode}, + ${row.interactionMode}, + ${row.lastSendError} + ) + `, + }); + + const listByThreadId: ProjectionThreadQueuedFollowUpRepositoryShape["listByThreadId"] = (input) => + listQueuedFollowUpRows(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadQueuedFollowUpRepository.listByThreadId:query"), + ), + ); + + const getById: ProjectionThreadQueuedFollowUpRepositoryShape["getById"] = (input) => + getQueuedFollowUpRow(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadQueuedFollowUpRepository.getById:query"), + ), + ); + + const replaceByThreadId: ProjectionThreadQueuedFollowUpRepositoryShape["replaceByThreadId"] = ( + input, + ) => + Effect.gen(function* () { + yield* deleteQueuedFollowUpsByThreadId({ threadId: input.threadId }).pipe( + Effect.mapError( + toPersistenceSqlError( + "ProjectionThreadQueuedFollowUpRepository.replaceByThreadId:delete", + ), + ), + ); + yield* Effect.forEach(input.followUps, (followUp) => + insertQueuedFollowUpRow(followUp).pipe( + Effect.mapError( + toPersistenceSqlError( + "ProjectionThreadQueuedFollowUpRepository.replaceByThreadId:insert", + ), + ), + ), + ).pipe(Effect.asVoid); + }); + + const deleteByThreadId: ProjectionThreadQueuedFollowUpRepositoryShape["deleteByThreadId"] = ( + input, + ) => + deleteQueuedFollowUpsByThreadId(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadQueuedFollowUpRepository.deleteByThreadId:query"), + ), + ); + + return { + listByThreadId, + getById, + replaceByThreadId, + deleteByThreadId, + } satisfies ProjectionThreadQueuedFollowUpRepositoryShape; +}); + +export const ProjectionThreadQueuedFollowUpRepositoryLive = Layer.effect( + ProjectionThreadQueuedFollowUpRepository, + makeProjectionThreadQueuedFollowUpRepository, +); diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index cc5024d5f5..e1117fb6fa 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -43,6 +43,7 @@ import Migration0027 from "./Migrations/027_ProviderSessionRuntimeInstanceId.ts" import Migration0028 from "./Migrations/028_ProjectionThreadSessionInstanceId.ts"; import Migration0029 from "./Migrations/029_ProjectionThreadDetailOrderingIndexes.ts"; import Migration0030 from "./Migrations/030_ProjectionThreadShellArchiveIndexes.ts"; +import Migration0031 from "./Migrations/031_ProjectionThreadQueuedFollowUps.ts"; /** * Migration loader with all migrations defined inline. @@ -85,6 +86,7 @@ export const migrationEntries = [ [28, "ProjectionThreadSessionInstanceId", Migration0028], [29, "ProjectionThreadDetailOrderingIndexes", Migration0029], [30, "ProjectionThreadShellArchiveIndexes", Migration0030], + [31, "ProjectionThreadQueuedFollowUps", Migration0031], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/031_ProjectionThreadQueuedFollowUps.ts b/apps/server/src/persistence/Migrations/031_ProjectionThreadQueuedFollowUps.ts new file mode 100644 index 0000000000..9a19a10b07 --- /dev/null +++ b/apps/server/src/persistence/Migrations/031_ProjectionThreadQueuedFollowUps.ts @@ -0,0 +1,29 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + CREATE TABLE IF NOT EXISTS projection_thread_queued_follow_ups ( + follow_up_id TEXT PRIMARY KEY, + thread_id TEXT NOT NULL, + queue_position INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + prompt TEXT NOT NULL, + attachments_json TEXT NOT NULL, + terminal_contexts_json TEXT NOT NULL, + model_selection_json TEXT NOT NULL, + runtime_mode TEXT NOT NULL, + interaction_mode TEXT NOT NULL, + last_send_error TEXT, + FOREIGN KEY (thread_id) REFERENCES projection_threads(thread_id) ON DELETE CASCADE + ) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_queued_follow_ups_thread_position + ON projection_thread_queued_follow_ups(thread_id, queue_position, created_at) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreadQueuedFollowUps.ts b/apps/server/src/persistence/Services/ProjectionThreadQueuedFollowUps.ts new file mode 100644 index 0000000000..d89e1d8b47 --- /dev/null +++ b/apps/server/src/persistence/Services/ProjectionThreadQueuedFollowUps.ts @@ -0,0 +1,97 @@ +import { + IsoDateTime, + ModelSelection, + OrchestrationQueuedFollowUp, + OrchestrationQueuedTerminalContext, + RuntimeMode, + ProviderInteractionMode, + ThreadId, + TrimmedNonEmptyString, + NonNegativeInt, + ChatAttachment, +} from "@t3tools/contracts"; +import * as Context from "effect/Context"; +import type * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; + +import type { ProjectionRepositoryError } from "../Errors.ts"; + +export const ProjectionThreadQueuedFollowUp = Schema.Struct({ + followUpId: TrimmedNonEmptyString, + threadId: ThreadId, + queuePosition: NonNegativeInt, + createdAt: IsoDateTime, + updatedAt: IsoDateTime, + prompt: Schema.String, + attachments: Schema.Array(ChatAttachment), + terminalContexts: Schema.Array(OrchestrationQueuedTerminalContext), + modelSelection: ModelSelection, + runtimeMode: RuntimeMode, + interactionMode: ProviderInteractionMode, + lastSendError: Schema.NullOr(TrimmedNonEmptyString), +}); +export type ProjectionThreadQueuedFollowUp = typeof ProjectionThreadQueuedFollowUp.Type; + +export const ListProjectionThreadQueuedFollowUpsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type ListProjectionThreadQueuedFollowUpsInput = + typeof ListProjectionThreadQueuedFollowUpsInput.Type; + +export const GetProjectionThreadQueuedFollowUpInput = Schema.Struct({ + followUpId: TrimmedNonEmptyString, +}); +export type GetProjectionThreadQueuedFollowUpInput = + typeof GetProjectionThreadQueuedFollowUpInput.Type; + +export const ReplaceProjectionThreadQueuedFollowUpsInput = Schema.Struct({ + threadId: ThreadId, + followUps: Schema.Array(ProjectionThreadQueuedFollowUp), +}); +export type ReplaceProjectionThreadQueuedFollowUpsInput = + typeof ReplaceProjectionThreadQueuedFollowUpsInput.Type; + +export const DeleteProjectionThreadQueuedFollowUpsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type DeleteProjectionThreadQueuedFollowUpsInput = + typeof DeleteProjectionThreadQueuedFollowUpsInput.Type; + +export function projectionQueuedFollowUpToContract( + row: ProjectionThreadQueuedFollowUp, +): OrchestrationQueuedFollowUp { + return { + id: row.followUpId, + createdAt: row.createdAt, + prompt: row.prompt, + attachments: row.attachments, + terminalContexts: row.terminalContexts, + modelSelection: row.modelSelection, + runtimeMode: row.runtimeMode, + interactionMode: row.interactionMode, + lastSendError: row.lastSendError, + }; +} + +export interface ProjectionThreadQueuedFollowUpRepositoryShape { + readonly listByThreadId: ( + input: ListProjectionThreadQueuedFollowUpsInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly getById: ( + input: GetProjectionThreadQueuedFollowUpInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly replaceByThreadId: ( + input: ReplaceProjectionThreadQueuedFollowUpsInput, + ) => Effect.Effect; + readonly deleteByThreadId: ( + input: DeleteProjectionThreadQueuedFollowUpsInput, + ) => Effect.Effect; +} + +export class ProjectionThreadQueuedFollowUpRepository extends Context.Service< + ProjectionThreadQueuedFollowUpRepository, + ProjectionThreadQueuedFollowUpRepositoryShape +>()( + "t3/persistence/Services/ProjectionThreadQueuedFollowUps/ProjectionThreadQueuedFollowUpRepository", +) {} diff --git a/apps/server/src/server.test.ts b/apps/server/src/server.test.ts index 8394897c48..dcf5b7d6bf 100644 --- a/apps/server/src/server.test.ts +++ b/apps/server/src/server.test.ts @@ -169,6 +169,7 @@ const makeDefaultOrchestrationReadModel = () => { archivedAt: null, latestTurn: null, messages: [], + queuedFollowUps: [], session: null, activities: [], proposedPlans: [], @@ -3214,6 +3215,7 @@ it.layer(NodeServices.layer)("server router seam", (it) => { archivedAt: null, latestTurn: null, messages: [], + queuedFollowUps: [], session: null, activities: [], proposedPlans: [], diff --git a/apps/server/tsconfig.json b/apps/server/tsconfig.json index b86bbf1f16..21b3182e13 100644 --- a/apps/server/tsconfig.json +++ b/apps/server/tsconfig.json @@ -3,7 +3,8 @@ "compilerOptions": { "composite": true, "types": ["node", "bun"], - "lib": ["ESNext", "esnext.disposable"] + "lib": ["ESNext", "esnext.disposable"], + "plugins": [] }, "include": ["src", "tsdown.config.ts", "scripts", "integration", "../../scripts/lib"] } diff --git a/apps/web/package.json b/apps/web/package.json index ba386f7d94..1e5a862a5b 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -39,7 +39,9 @@ "react-dom": "^19.0.0", "react-markdown": "^10.1.0", "remark-gfm": "^4.0.1", + "remark-parse": "^11.0.0", "tailwind-merge": "^3.4.0", + "unified": "^11.0.5", "zustand": "^5.0.11" }, "devDependencies": { diff --git a/apps/web/src/components/ChatMarkdown.test.tsx b/apps/web/src/components/ChatMarkdown.test.tsx new file mode 100644 index 0000000000..a4e36f58b1 --- /dev/null +++ b/apps/web/src/components/ChatMarkdown.test.tsx @@ -0,0 +1,43 @@ +import { renderToStaticMarkup } from "react-dom/server"; +import { describe, expect, it, vi } from "vitest"; + +vi.mock("../hooks/useTheme", () => ({ + useTheme: () => ({ + theme: "light", + resolvedTheme: "light", + }), +})); + +describe("ChatMarkdown", () => { + it("highlights assistant markdown text matches", async () => { + const { default: ChatMarkdown } = await import("./ChatMarkdown"); + const markup = renderToStaticMarkup( + , + ); + + expect(markup).toContain('data-thread-search-highlight="active"'); + expect(markup).toContain("highlight<"); + }); + + it("highlights fenced code matches without dropping the visible mark", async () => { + const { default: ChatMarkdown } = await import("./ChatMarkdown"); + const markup = renderToStaticMarkup( + , + ); + + expect(markup).toContain('data-thread-search-highlight="active"'); + expect(markup).toContain("highlightNeedle<"); + }); +}); diff --git a/apps/web/src/components/ChatMarkdown.tsx b/apps/web/src/components/ChatMarkdown.tsx index 6a85ccee96..24004779d6 100644 --- a/apps/web/src/components/ChatMarkdown.tsx +++ b/apps/web/src/components/ChatMarkdown.tsx @@ -21,6 +21,7 @@ import { defaultUrlTransform } from "react-markdown"; import remarkGfm from "remark-gfm"; import { VscodeEntryIcon } from "./chat/VscodeEntryIcon"; import { renderSkillInlineMarkdownChildren } from "./chat/SkillInlineText"; +import { createThreadSearchHighlightRehypePlugin } from "./chat/threadSearchHighlight"; import { Tooltip, TooltipPopup, TooltipTrigger } from "./ui/tooltip"; import { stackedThreadToast, toastManager } from "./ui/toast"; import { openInPreferredEditor } from "../editorPreferences"; @@ -62,6 +63,8 @@ interface ChatMarkdownProps { cwd: string | undefined; isStreaming?: boolean; skills?: ReadonlyArray>; + searchQuery?: string; + searchActive?: boolean; } const EMPTY_MARKDOWN_SKILLS: ReadonlyArray> = []; @@ -517,6 +520,8 @@ function ChatMarkdown({ cwd, isStreaming = false, skills = EMPTY_MARKDOWN_SKILLS, + searchQuery = "", + searchActive = false, }: ChatMarkdownProps) { const { resolvedTheme } = useTheme(); const diffThemeName = resolveDiffThemeName(resolvedTheme); @@ -542,6 +547,12 @@ function ChatMarkdown({ const markdownUrlTransform = useCallback((href: string) => { return rewriteMarkdownFileUriHref(href) ?? defaultUrlTransform(href); }, []); + const rehypePlugins = useMemo(() => { + const highlightPlugin = createThreadSearchHighlightRehypePlugin(searchQuery, { + active: searchActive, + }); + return highlightPlugin ? [highlightPlugin] : []; + }, [searchActive, searchQuery]); const markdownComponents = useMemo( () => ({ p({ node: _node, children, ...props }) { @@ -616,6 +627,7 @@ function ChatMarkdown({
diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index bb62d8edbc..d5a585bbc4 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -363,6 +363,7 @@ function createSnapshotForTargetUser(options: { archivedAt: null, deletedAt: null, messages, + queuedFollowUps: [], activities: [], proposedPlans: [], checkpoints: [], @@ -428,6 +429,7 @@ function addThreadToSnapshot( archivedAt: null, deletedAt: null, messages: [], + queuedFollowUps: [], activities: [], proposedPlans: [], checkpoints: [], @@ -764,6 +766,7 @@ function createSnapshotWithSecondaryProject(options?: { messages: [], activities: [], proposedPlans: [], + queuedFollowUps: [], checkpoints: [], session: { threadId: "thread-secondary-project" as ThreadId, @@ -796,6 +799,7 @@ function createSnapshotWithSecondaryProject(options?: { messages: [], activities: [], proposedPlans: [], + queuedFollowUps: [], checkpoints: [], session: { threadId: ARCHIVED_SECONDARY_THREAD_ID, diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index bf87add28d..5b5fbe74e3 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -8,7 +8,13 @@ import { type ThreadId, type TurnId, } from "@t3tools/contracts"; -import { type ChatMessage, type SessionPhase, type Thread, type ThreadSession } from "../types"; +import { + type ChatMessage, + type QueuedFollowUp, + type SessionPhase, + type Thread, + type ThreadSession, +} from "../types"; import { type ComposerImageAttachment, type DraftThreadState } from "../composerDraftStore"; import * as Schema from "effect/Schema"; import { selectThreadByRef, useStore } from "../store"; @@ -50,9 +56,15 @@ export function buildLocalDraftThread( turnDiffSummaries: [], activities: [], proposedPlans: [], + queuedFollowUps: [], }; } +export function describeQueuedFollowUp(followUp: Pick): string { + const trimmed = followUp.prompt.trim().replace(/\s+/g, " "); + return trimmed.length > 0 ? trimmed : "Empty follow-up"; +} + export function shouldWriteThreadErrorToCurrentServerThread(input: { serverThread: | { diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index 611eaf572d..56fe5dcc2d 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -182,6 +182,7 @@ function createMinimalSnapshot(): OrchestrationReadModel { updatedAt: NOW_ISO, }, ], + queuedFollowUps: [], activities: [], proposedPlans: [], checkpoints: [], diff --git a/apps/web/src/components/chat/ComposerQueuedFollowUpsPanel.tsx b/apps/web/src/components/chat/ComposerQueuedFollowUpsPanel.tsx new file mode 100644 index 0000000000..6e0f0ef4b3 --- /dev/null +++ b/apps/web/src/components/chat/ComposerQueuedFollowUpsPanel.tsx @@ -0,0 +1,230 @@ +import type * as React from "react"; +import { memo, useEffect, useRef, useState } from "react"; +import { CornerDownRightIcon, EllipsisIcon, PencilIcon, Trash2Icon } from "lucide-react"; +import { type QueuedFollowUp } from "../../types"; +import { describeQueuedFollowUp } from "../ChatView.logic"; +import { Button } from "../ui/button"; + +function resolveDropPosition( + event: Pick, "clientY" | "currentTarget">, +): "before" | "after" { + const bounds = event.currentTarget.getBoundingClientRect(); + return event.clientY <= bounds.top + bounds.height / 2 ? "before" : "after"; +} + +function resolveTargetIndex( + currentIndex: number, + hoveredIndex: number, + position: "before" | "after", +): number { + if (position === "before") { + return currentIndex < hoveredIndex ? hoveredIndex - 1 : hoveredIndex; + } + return currentIndex < hoveredIndex ? hoveredIndex : hoveredIndex + 1; +} + +function QueuedFollowUpSummaryIcon() { + return ( + + ); +} + +function DragGripDots() { + return ( +