From c29fa0ee7e78bee697fd837fa17ae1135aaf2f54 Mon Sep 17 00:00:00 2001 From: SimoneBovi Date: Mon, 20 Apr 2026 18:35:16 +0200 Subject: [PATCH] Fix T3 import reliability and surface provider usage --- apps/server/src/legacyStateImport.ts | 1136 +++++++++++++++++ apps/server/src/main.ts | 15 - .../Layers/ProjectionPipeline.ts | 202 ++- apps/server/src/orchestration/Schemas.ts | 6 + apps/server/src/orchestration/decider.ts | 67 +- apps/server/src/orchestration/projector.ts | 199 ++- apps/server/src/persistence/Migrations.ts | 2 + .../031_RepairLegacyProjectionSchema.test.ts | 180 +++ .../031_RepairLegacyProjectionSchema.ts | 148 +++ apps/server/src/providerUsageSnapshot.ts | 609 +++++++++ apps/server/src/wsServer.ts | 30 + apps/web/src/appSettings.ts | 1 + apps/web/src/components/BranchToolbar.tsx | 43 +- apps/web/src/components/ChatView.tsx | 47 +- .../components/ProviderUsagePanelContent.tsx | 90 ++ apps/web/src/components/Sidebar.tsx | 41 +- .../components/chat/ComposerUsageBadge.tsx | 93 ++ apps/web/src/hooks/useLegacyT3Import.ts | 97 ++ apps/web/src/hooks/useProviderUsageSummary.ts | 83 ++ apps/web/src/lib/providerUsageSnapshot.ts | 39 + apps/web/src/lib/rateLimits.ts | 6 + apps/web/src/lib/serverReactQuery.ts | 25 + apps/web/src/routes/_chat.settings.tsx | 61 +- apps/web/src/store.ts | 141 ++ apps/web/src/wsNativeApi.test.ts | 37 + apps/web/src/wsNativeApi.ts | 9 + packages/contracts/src/ipc.ts | 10 + packages/contracts/src/orchestration.ts | 103 +- packages/contracts/src/server.ts | 35 + packages/contracts/src/ws.ts | 8 + 30 files changed, 3423 insertions(+), 140 deletions(-) create mode 100644 apps/server/src/legacyStateImport.ts create mode 100644 apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.test.ts create mode 100644 apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.ts create mode 100644 apps/server/src/providerUsageSnapshot.ts create mode 100644 apps/web/src/components/ProviderUsagePanelContent.tsx create mode 100644 apps/web/src/components/chat/ComposerUsageBadge.tsx create mode 100644 apps/web/src/hooks/useLegacyT3Import.ts create mode 100644 apps/web/src/hooks/useProviderUsageSummary.ts create mode 100644 apps/web/src/lib/providerUsageSnapshot.ts diff --git a/apps/server/src/legacyStateImport.ts b/apps/server/src/legacyStateImport.ts new file mode 100644 index 00000000..1851a443 --- /dev/null +++ b/apps/server/src/legacyStateImport.ts @@ -0,0 +1,1136 @@ +import { DatabaseSync } from "node:sqlite"; + +import { + CheckpointRef, + ChatAttachment, + CommandId, + EventId, + MessageId, + ModelSelection, + NonNegativeInt, + OrchestrationMessageRole, + OrchestrationMessageSource, + OrchestrationCheckpointFile, + OrchestrationImportLegacyT3StateResult, + OrchestrationProposedPlanId, + OrchestrationThreadActivityTone, + OrchestrationThreadPullRequest, + ProviderMentionReference, + ProviderSkillReference, + ProjectId, + ProjectKind, + ProjectScript, + ThreadHandoff, + ThreadId, + TurnDispatchMode, + TrimmedNonEmptyString, + TurnId, +} from "@t3tools/contracts"; +import { workspaceRootsEqual } from "@t3tools/shared/threadWorkspace"; +import { Data, Effect, FileSystem, Path, Schema } from "effect"; + +import { resolveAttachmentPath } from "./attachmentStore.ts"; +import { deriveServerPaths, ServerConfig, type ServerDerivedPaths } from "./config.ts"; +import { LEGACY_T3_HOME_DIRNAME } from "./homeMigration.ts"; +import { OrchestrationEngineService } from "./orchestration/Services/OrchestrationEngine.ts"; + +export class LegacyStateImportError extends Data.TaggedError("LegacyStateImportError")<{ + readonly message: string; + readonly cause?: unknown; +}> {} + +function isLegacyStateImportError(cause: unknown): cause is LegacyStateImportError { + return ( + typeof cause === "object" && + cause !== null && + "_tag" in cause && + (cause as { readonly _tag?: unknown })._tag === "LegacyStateImportError" + ); +} + +type SourceProjectSnapshot = { + readonly id: ProjectId; + readonly kind: ProjectKind; + readonly title: string; + readonly workspaceRoot: string; + readonly defaultModelSelection: typeof ModelSelection.Type | null; + readonly scripts: ReadonlyArray; + readonly createdAt: string; + readonly updatedAt: string; + readonly deletedAt: string | null; +}; + +type SourceImportedMessage = { + readonly messageId: string; + readonly role: typeof OrchestrationMessageRole.Type; + readonly text: string; + readonly attachments: ReadonlyArray; + readonly skills: ReadonlyArray; + readonly mentions: ReadonlyArray; + readonly dispatchMode: typeof TurnDispatchMode.Type | null; + readonly turnId: string | null; + readonly streaming: boolean; + readonly source: typeof OrchestrationMessageSource.Type; + readonly createdAt: string; + readonly updatedAt: string; +}; + +type SourceThreadActivity = { + readonly id: string; + readonly turnId: string | null; + readonly tone: typeof OrchestrationThreadActivityTone.Type; + readonly kind: string; + readonly summary: string; + readonly payload: unknown; + readonly sequence: number | null; + readonly createdAt: string; +}; + +type SourceProposedPlan = { + readonly id: string; + readonly turnId: string | null; + readonly planMarkdown: string; + readonly implementedAt: string | null; + readonly implementationThreadId: ThreadId | null; + readonly createdAt: string; + readonly updatedAt: string; +}; + +type SourceCheckpoint = { + readonly turnId: string; + readonly checkpointTurnCount: number; + readonly checkpointRef: string; + readonly status: "ready" | "missing" | "error"; + readonly files: ReadonlyArray; + readonly assistantMessageId: string | null; + readonly completedAt: string; +}; + +type SourceThreadSnapshot = { + readonly id: ThreadId; + readonly projectId: ProjectId; + readonly title: string; + readonly modelSelection: typeof ModelSelection.Type; + readonly runtimeMode: "approval-required" | "full-access"; + readonly interactionMode: "default" | "plan"; + readonly envMode: "local" | "worktree"; + readonly branch: string | null; + readonly worktreePath: string | null; + readonly associatedWorktreePath: string | null; + readonly associatedWorktreeBranch: string | null; + readonly associatedWorktreeRef: string | null; + readonly parentThreadId: ThreadId | null; + readonly subagentAgentId: string | null; + readonly subagentNickname: string | null; + readonly subagentRole: string | null; + readonly lastKnownPr: typeof OrchestrationThreadPullRequest.Type | null; + readonly handoff: typeof ThreadHandoff.Type | null; + readonly createdAt: string; + readonly updatedAt: string; + readonly archivedAt: string | null; + readonly deletedAt: string | null; + readonly messages: ReadonlyArray; + readonly activities: ReadonlyArray; + readonly proposedPlans: ReadonlyArray; + readonly checkpoints: ReadonlyArray; +}; + +type SourceSnapshot = { + readonly projects: ReadonlyArray; + readonly threads: ReadonlyArray; +}; + +type ImportCounters = { + importedProjects: number; + mappedProjects: number; + skippedProjects: number; + importedThreads: number; + skippedThreads: number; + importedMessages: number; + importedActivities: number; + importedProposedPlans: number; + importedCheckpoints: number; + copiedAttachments: number; + skippedAttachments: number; + missingAttachments: number; +}; + +type SourcePathCandidate = ServerDerivedPaths & { + readonly baseDir: string; +}; + +type SourceProjectRow = { + readonly projectId: string; + readonly kind: string; + readonly title: string; + readonly workspaceRoot: string; + readonly defaultModelSelectionJson: string | null; + readonly scriptsJson: string; + readonly createdAt: string; + readonly updatedAt: string; + readonly deletedAt: string | null; +}; + +type SourceThreadRow = { + readonly threadId: string; + readonly projectId: string; + readonly title: string; + readonly modelSelectionJson: string; + readonly runtimeMode: string; + readonly interactionMode: string; + readonly envMode: string; + readonly branch: string | null; + readonly worktreePath: string | null; + readonly associatedWorktreePath: string | null; + readonly associatedWorktreeBranch: string | null; + readonly associatedWorktreeRef: string | null; + readonly parentThreadId: string | null; + readonly subagentAgentId: string | null; + readonly subagentNickname: string | null; + readonly subagentRole: string | null; + readonly lastKnownPrJson: string | null; + readonly handoffJson: string | null; + readonly createdAt: string; + readonly updatedAt: string; + readonly archivedAt: string | null; + readonly deletedAt: string | null; +}; + +type SourceMessageRow = { + readonly messageId: string; + readonly threadId: string; + readonly role: string; + readonly text: string; + readonly turnId: string | null; + readonly attachmentsJson: string | null; + readonly skillsJson: string | null; + readonly mentionsJson: string | null; + readonly dispatchMode: string | null; + readonly isStreaming: number | null; + readonly source: string | null; + readonly createdAt: string; + readonly updatedAt: string; +}; + +type SourceActivityRow = { + readonly activityId: string; + readonly threadId: string; + readonly turnId: string | null; + readonly tone: string; + readonly kind: string; + readonly summary: string; + readonly payloadJson: string; + readonly sequence: number | null; + readonly createdAt: string; +}; + +type SourceProposedPlanRow = { + readonly planId: string; + readonly threadId: string; + readonly turnId: string | null; + readonly planMarkdown: string; + readonly implementedAt: string | null; + readonly implementationThreadId: string | null; + readonly createdAt: string; + readonly updatedAt: string; +}; + +type SourceCheckpointRow = { + readonly threadId: string; + readonly turnId: string; + readonly checkpointTurnCount: number; + readonly checkpointRef: string; + readonly checkpointStatus: string; + readonly checkpointFilesJson: string; + readonly assistantMessageId: string | null; + readonly completedAt: string; +}; + +const decodeModelSelection = Schema.decodeUnknownSync(ModelSelection); +const decodeProjectScripts = Schema.decodeUnknownSync(Schema.Array(ProjectScript)); +const decodeChatAttachments = Schema.decodeUnknownSync(Schema.Array(ChatAttachment)); +const decodeProviderSkills = Schema.decodeUnknownSync(Schema.Array(ProviderSkillReference)); +const decodeProviderMentions = Schema.decodeUnknownSync(Schema.Array(ProviderMentionReference)); +const decodeThreadHandoff = Schema.decodeUnknownSync(ThreadHandoff); +const decodeThreadPullRequest = Schema.decodeUnknownSync(OrchestrationThreadPullRequest); +const decodeCheckpointFiles = Schema.decodeUnknownSync(Schema.Array(OrchestrationCheckpointFile)); +const decodeUnknownJson = Schema.decodeUnknownSync(Schema.Unknown); +const asCommandId = () => CommandId.makeUnsafe(`import-legacy-t3:${crypto.randomUUID()}`); + +class SourceDatabaseReader { + private tableNames: Set | null = null; + private readonly columnsByTable = new Map>(); + + constructor(private readonly db: DatabaseSync) {} + + all(query: string): ReadonlyArray { + return this.db.prepare(query).all() as unknown as ReadonlyArray; + } + + hasTable(tableName: string): boolean { + if (this.tableNames === null) { + this.tableNames = new Set( + this.all<{ readonly name: string }>( + "SELECT name FROM sqlite_master WHERE type = 'table'", + ).map((row) => row.name), + ); + } + return this.tableNames.has(tableName); + } + + hasColumn(tableName: string, columnName: string): boolean { + let columns = this.columnsByTable.get(tableName); + if (!columns) { + if (!this.hasTable(tableName)) { + columns = new Set(); + } else { + columns = new Set( + this.all<{ readonly name: string }>(`PRAGMA table_info("${tableName}")`).map( + (row) => row.name, + ), + ); + } + this.columnsByTable.set(tableName, columns); + } + return columns.has(columnName); + } +} + +function requiredTable(reader: SourceDatabaseReader, tableName: string): void { + if (reader.hasTable(tableName)) { + return; + } + throw new LegacyStateImportError({ + message: `The legacy T3 database is missing the required table '${tableName}'.`, + }); +} + +function selectColumn( + reader: SourceDatabaseReader, + tableName: string, + columnName: string, + alias: string, + fallbackSql = "NULL", +): string { + return reader.hasColumn(tableName, columnName) + ? `"${columnName}" AS "${alias}"` + : `${fallbackSql} AS "${alias}"`; +} + +function decodeJsonColumn( + raw: string | null | undefined, + decode: (value: unknown) => T, + fallback: T, + fieldName: string, +): T { + if (raw === null || raw === undefined || raw.length === 0) { + return fallback; + } + try { + return decode(JSON.parse(raw)); + } catch (cause) { + throw new LegacyStateImportError({ + message: `Failed to decode '${fieldName}' from the legacy T3 DB.`, + cause, + }); + } +} + +const asMessageId = (value: string) => MessageId.makeUnsafe(value); +const asEventId = (value: string) => EventId.makeUnsafe(value); +const asCheckpointRef = (value: string) => CheckpointRef.makeUnsafe(value); +const asNonNegativeInt = (value: number) => NonNegativeInt.makeUnsafe(value); +const asTurnId = (value: string) => TurnId.makeUnsafe(value); +const asTrimmedNonEmptyString = (value: string) => TrimmedNonEmptyString.makeUnsafe(value); +const asOptionalTrimmedNonEmptyString = (value: string | null) => + value === null ? null : asTrimmedNonEmptyString(value); +const asProposedPlanId = (value: string) => OrchestrationProposedPlanId.makeUnsafe(value); + +function maxIso(values: ReadonlyArray): string | null { + let latest: string | null = null; + for (const value of values) { + if (latest === null || latest.localeCompare(value) < 0) { + latest = value; + } + } + return latest; +} + +function chunkArray( + items: ReadonlyArray, + chunkSize: number, +): ReadonlyArray> { + if (items.length === 0) { + return []; + } + const chunks: Array> = []; + for (let index = 0; index < items.length; index += chunkSize) { + chunks.push(items.slice(index, index + chunkSize)); + } + return chunks; +} + +function readSourceProjects(reader: SourceDatabaseReader): ReadonlyArray { + const tableName = "projection_projects"; + requiredTable(reader, tableName); + const rows = reader.all(` + SELECT + ${selectColumn(reader, tableName, "project_id", "projectId")}, + ${selectColumn(reader, tableName, "kind", "kind", "'project'")}, + ${selectColumn(reader, tableName, "title", "title", "''")}, + ${selectColumn(reader, tableName, "workspace_root", "workspaceRoot", "''")}, + ${selectColumn( + reader, + tableName, + "default_model_selection_json", + "defaultModelSelectionJson", + )}, + ${selectColumn(reader, tableName, "scripts_json", "scriptsJson", "'[]'")}, + ${selectColumn(reader, tableName, "created_at", "createdAt", "''")}, + ${selectColumn(reader, tableName, "updated_at", "updatedAt", "''")}, + ${selectColumn(reader, tableName, "deleted_at", "deletedAt")} + FROM "${tableName}" + ORDER BY "created_at" ASC, "project_id" ASC + `); + + return rows.map((row) => ({ + id: ProjectId.makeUnsafe(row.projectId), + kind: row.kind === "chat" ? "chat" : "project", + title: row.title, + workspaceRoot: row.workspaceRoot, + defaultModelSelection: decodeJsonColumn( + row.defaultModelSelectionJson, + decodeModelSelection, + null, + "projection_projects.default_model_selection_json", + ), + scripts: decodeJsonColumn( + row.scriptsJson, + decodeProjectScripts, + [], + "projection_projects.scripts_json", + ), + createdAt: row.createdAt, + updatedAt: row.updatedAt, + deletedAt: row.deletedAt, + })); +} + +function readSourceThreads(reader: SourceDatabaseReader): ReadonlyArray { + const tableName = "projection_threads"; + requiredTable(reader, tableName); + return reader.all(` + SELECT + ${selectColumn(reader, tableName, "thread_id", "threadId")}, + ${selectColumn(reader, tableName, "project_id", "projectId")}, + ${selectColumn(reader, tableName, "title", "title", "''")}, + ${selectColumn(reader, tableName, "model_selection_json", "modelSelectionJson", "'{}'")}, + ${selectColumn(reader, tableName, "runtime_mode", "runtimeMode", "'full-access'")}, + ${selectColumn(reader, tableName, "interaction_mode", "interactionMode", "'default'")}, + ${selectColumn(reader, tableName, "env_mode", "envMode", "'local'")}, + ${selectColumn(reader, tableName, "branch", "branch")}, + ${selectColumn(reader, tableName, "worktree_path", "worktreePath")}, + ${selectColumn(reader, tableName, "associated_worktree_path", "associatedWorktreePath")}, + ${selectColumn(reader, tableName, "associated_worktree_branch", "associatedWorktreeBranch")}, + ${selectColumn(reader, tableName, "associated_worktree_ref", "associatedWorktreeRef")}, + ${selectColumn(reader, tableName, "parent_thread_id", "parentThreadId")}, + ${selectColumn(reader, tableName, "subagent_agent_id", "subagentAgentId")}, + ${selectColumn(reader, tableName, "subagent_nickname", "subagentNickname")}, + ${selectColumn(reader, tableName, "subagent_role", "subagentRole")}, + ${selectColumn(reader, tableName, "last_known_pr_json", "lastKnownPrJson")}, + ${selectColumn(reader, tableName, "handoff_json", "handoffJson")}, + ${selectColumn(reader, tableName, "created_at", "createdAt", "''")}, + ${selectColumn(reader, tableName, "updated_at", "updatedAt", "''")}, + ${selectColumn(reader, tableName, "archived_at", "archivedAt")}, + ${selectColumn(reader, tableName, "deleted_at", "deletedAt")} + FROM "${tableName}" + ORDER BY "created_at" ASC, "thread_id" ASC + `); +} + +function readSourceMessages( + reader: SourceDatabaseReader, +): ReadonlyMap> { + const tableName = "projection_thread_messages"; + requiredTable(reader, tableName); + const rows = reader.all(` + SELECT + ${selectColumn(reader, tableName, "message_id", "messageId")}, + ${selectColumn(reader, tableName, "thread_id", "threadId")}, + ${selectColumn(reader, tableName, "role", "role", "''")}, + ${selectColumn(reader, tableName, "text", "text", "''")}, + ${selectColumn(reader, tableName, "turn_id", "turnId")}, + ${selectColumn(reader, tableName, "attachments_json", "attachmentsJson")}, + ${selectColumn(reader, tableName, "skills_json", "skillsJson")}, + ${selectColumn(reader, tableName, "mentions_json", "mentionsJson")}, + ${selectColumn(reader, tableName, "dispatch_mode", "dispatchMode")}, + ${selectColumn(reader, tableName, "is_streaming", "isStreaming", "0")}, + ${selectColumn(reader, tableName, "source", "source", "'native'")}, + ${selectColumn(reader, tableName, "created_at", "createdAt", "''")}, + ${selectColumn(reader, tableName, "updated_at", "updatedAt", "created_at")} + FROM "${tableName}" + ORDER BY "thread_id" ASC, "created_at" ASC, "message_id" ASC + `); + + const messagesByThread = new Map(); + for (const row of rows) { + const role = + row.role === "assistant" || row.role === "system" || row.role === "user" ? row.role : null; + if (role === null) { + continue; + } + const source = + row.source === "handoff-import" || row.source === "fork-import" || row.source === "native" + ? row.source + : "native"; + const dispatchMode = + row.dispatchMode === "queue" || row.dispatchMode === "steer" ? row.dispatchMode : null; + const threadId = ThreadId.makeUnsafe(row.threadId); + const entries = messagesByThread.get(threadId) ?? []; + entries.push({ + messageId: row.messageId, + role, + text: row.text, + turnId: row.turnId, + attachments: decodeJsonColumn( + row.attachmentsJson, + decodeChatAttachments, + [], + "projection_thread_messages.attachments_json", + ), + skills: decodeJsonColumn( + row.skillsJson, + decodeProviderSkills, + [], + "projection_thread_messages.skills_json", + ), + mentions: decodeJsonColumn( + row.mentionsJson, + decodeProviderMentions, + [], + "projection_thread_messages.mentions_json", + ), + dispatchMode, + streaming: row.isStreaming === 1, + source, + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + messagesByThread.set(threadId, entries); + } + return messagesByThread; +} + +function readSourceActivities( + reader: SourceDatabaseReader, +): ReadonlyMap> { + const tableName = "projection_thread_activities"; + if (!reader.hasTable(tableName)) { + return new Map(); + } + const rows = reader.all(` + SELECT + ${selectColumn(reader, tableName, "activity_id", "activityId")}, + ${selectColumn(reader, tableName, "thread_id", "threadId")}, + ${selectColumn(reader, tableName, "turn_id", "turnId")}, + ${selectColumn(reader, tableName, "tone", "tone", "'info'")}, + ${selectColumn(reader, tableName, "kind", "kind", "''")}, + ${selectColumn(reader, tableName, "summary", "summary", "''")}, + ${selectColumn(reader, tableName, "payload_json", "payloadJson", "'null'")}, + ${selectColumn(reader, tableName, "sequence", "sequence")}, + ${selectColumn(reader, tableName, "created_at", "createdAt", "''")} + FROM "${tableName}" + ORDER BY "thread_id" ASC, "sequence" ASC, "created_at" ASC, "activity_id" ASC + `); + + const activitiesByThread = new Map(); + for (const row of rows) { + const tone = + row.tone === "tool" || row.tone === "approval" || row.tone === "error" ? row.tone : "info"; + const threadId = ThreadId.makeUnsafe(row.threadId); + const entries = activitiesByThread.get(threadId) ?? []; + entries.push({ + id: row.activityId, + turnId: row.turnId, + tone, + kind: row.kind, + summary: row.summary, + payload: decodeJsonColumn( + row.payloadJson, + decodeUnknownJson, + null, + "projection_thread_activities.payload_json", + ), + sequence: row.sequence, + createdAt: row.createdAt, + }); + activitiesByThread.set(threadId, entries); + } + return activitiesByThread; +} + +function readSourceProposedPlans( + reader: SourceDatabaseReader, +): ReadonlyMap> { + const tableName = "projection_thread_proposed_plans"; + if (!reader.hasTable(tableName)) { + return new Map(); + } + const rows = reader.all(` + SELECT + ${selectColumn(reader, tableName, "plan_id", "planId")}, + ${selectColumn(reader, tableName, "thread_id", "threadId")}, + ${selectColumn(reader, tableName, "turn_id", "turnId")}, + ${selectColumn(reader, tableName, "plan_markdown", "planMarkdown", "''")}, + ${selectColumn(reader, tableName, "implemented_at", "implementedAt")}, + ${selectColumn(reader, tableName, "implementation_thread_id", "implementationThreadId")}, + ${selectColumn(reader, tableName, "created_at", "createdAt", "''")}, + ${selectColumn(reader, tableName, "updated_at", "updatedAt", "''")} + FROM "${tableName}" + ORDER BY "thread_id" ASC, "created_at" ASC, "plan_id" ASC + `); + + const plansByThread = new Map(); + for (const row of rows) { + const threadId = ThreadId.makeUnsafe(row.threadId); + const entries = plansByThread.get(threadId) ?? []; + entries.push({ + id: row.planId, + turnId: row.turnId, + planMarkdown: row.planMarkdown, + implementedAt: row.implementedAt, + implementationThreadId: + row.implementationThreadId === null + ? null + : ThreadId.makeUnsafe(row.implementationThreadId), + createdAt: row.createdAt, + updatedAt: row.updatedAt, + }); + plansByThread.set(threadId, entries); + } + return plansByThread; +} + +function readSourceCheckpoints( + reader: SourceDatabaseReader, +): ReadonlyMap> { + const tableName = "projection_turns"; + if (!reader.hasTable(tableName)) { + return new Map(); + } + const rows = reader.all(` + SELECT + ${selectColumn(reader, tableName, "thread_id", "threadId")}, + ${selectColumn(reader, tableName, "turn_id", "turnId")}, + ${selectColumn(reader, tableName, "checkpoint_turn_count", "checkpointTurnCount")}, + ${selectColumn(reader, tableName, "checkpoint_ref", "checkpointRef")}, + ${selectColumn(reader, tableName, "checkpoint_status", "checkpointStatus")}, + ${selectColumn(reader, tableName, "checkpoint_files_json", "checkpointFilesJson", "'[]'")}, + ${selectColumn(reader, tableName, "assistant_message_id", "assistantMessageId")}, + ${selectColumn(reader, tableName, "completed_at", "completedAt")} + FROM "${tableName}" + WHERE "turn_id" IS NOT NULL + AND "checkpoint_turn_count" IS NOT NULL + AND "checkpoint_ref" IS NOT NULL + AND "checkpoint_status" IS NOT NULL + AND "completed_at" IS NOT NULL + ORDER BY "thread_id" ASC, "checkpoint_turn_count" ASC, "turn_id" ASC + `); + + const checkpointsByThread = new Map(); + for (const row of rows) { + const status = + row.checkpointStatus === "missing" || row.checkpointStatus === "error" + ? row.checkpointStatus + : "ready"; + const threadId = ThreadId.makeUnsafe(row.threadId); + const entries = checkpointsByThread.get(threadId) ?? []; + entries.push({ + turnId: row.turnId, + checkpointTurnCount: row.checkpointTurnCount, + checkpointRef: row.checkpointRef, + status, + files: decodeJsonColumn( + row.checkpointFilesJson, + decodeCheckpointFiles, + [], + "projection_turns.checkpoint_files_json", + ), + assistantMessageId: row.assistantMessageId, + completedAt: row.completedAt, + }); + checkpointsByThread.set(threadId, entries); + } + return checkpointsByThread; +} + +function readSourceSnapshot(sourceDbPath: string): SourceSnapshot { + const db = new DatabaseSync(sourceDbPath, { readOnly: true }); + try { + const reader = new SourceDatabaseReader(db); + const projects = readSourceProjects(reader); + const threadRows = readSourceThreads(reader); + const messagesByThread = readSourceMessages(reader); + const activitiesByThread = readSourceActivities(reader); + const proposedPlansByThread = readSourceProposedPlans(reader); + const checkpointsByThread = readSourceCheckpoints(reader); + + const threads: SourceThreadSnapshot[] = threadRows.map((row) => ({ + id: ThreadId.makeUnsafe(row.threadId), + projectId: ProjectId.makeUnsafe(row.projectId), + title: row.title, + modelSelection: decodeJsonColumn( + row.modelSelectionJson, + decodeModelSelection, + { provider: "codex", model: "gpt-5-codex" }, + "projection_threads.model_selection_json", + ), + runtimeMode: row.runtimeMode === "approval-required" ? "approval-required" : "full-access", + interactionMode: row.interactionMode === "plan" ? "plan" : "default", + envMode: row.envMode === "worktree" ? "worktree" : "local", + branch: row.branch, + worktreePath: row.worktreePath, + associatedWorktreePath: row.associatedWorktreePath, + associatedWorktreeBranch: row.associatedWorktreeBranch, + associatedWorktreeRef: row.associatedWorktreeRef, + parentThreadId: row.parentThreadId === null ? null : ThreadId.makeUnsafe(row.parentThreadId), + subagentAgentId: row.subagentAgentId, + subagentNickname: row.subagentNickname, + subagentRole: row.subagentRole, + lastKnownPr: decodeJsonColumn( + row.lastKnownPrJson, + decodeThreadPullRequest, + null, + "projection_threads.last_known_pr_json", + ), + handoff: decodeJsonColumn( + row.handoffJson, + decodeThreadHandoff, + null, + "projection_threads.handoff_json", + ), + createdAt: row.createdAt, + updatedAt: row.updatedAt, + archivedAt: row.archivedAt, + deletedAt: row.deletedAt, + messages: messagesByThread.get(ThreadId.makeUnsafe(row.threadId)) ?? [], + activities: activitiesByThread.get(ThreadId.makeUnsafe(row.threadId)) ?? [], + proposedPlans: proposedPlansByThread.get(ThreadId.makeUnsafe(row.threadId)) ?? [], + checkpoints: checkpointsByThread.get(ThreadId.makeUnsafe(row.threadId)) ?? [], + })); + + return { + projects, + threads, + }; + } finally { + db.close(); + } +} + +const resolveSourcePaths = Effect.fn(function* (input: { + readonly sourceBaseDir?: string; + readonly homeDir: string; + readonly devUrl: URL | undefined; +}) { + const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const sourceBaseDir = path.resolve( + input.sourceBaseDir?.trim().length + ? input.sourceBaseDir.trim() + : path.join(input.homeDir, LEGACY_T3_HOME_DIRNAME), + ); + + const candidates: ReadonlyArray = yield* Effect.all( + [ + deriveServerPaths(sourceBaseDir, undefined).pipe( + Effect.map((paths) => ({ baseDir: sourceBaseDir, ...paths })), + ), + ...(input.devUrl === undefined + ? [] + : [ + deriveServerPaths(sourceBaseDir, input.devUrl).pipe( + Effect.map((paths) => ({ baseDir: sourceBaseDir, ...paths })), + ), + ]), + ], + { concurrency: "unbounded" }, + ); + + const uniqueCandidates = candidates.filter( + (candidate, index, allCandidates) => + allCandidates.findIndex((entry) => entry.stateDir === candidate.stateDir) === index, + ); + + for (const candidate of uniqueCandidates) { + if (yield* fs.exists(candidate.dbPath)) { + return candidate; + } + } + + return yield* new LegacyStateImportError({ + message: `No T3 Code state database was found under '${sourceBaseDir}'. Expected 'userdata/state.sqlite'${ + input.devUrl ? " or 'dev/state.sqlite'" : "" + }.`, + }); +}); + +function findMappedProjectId(input: { + readonly sourceProject: SourceProjectSnapshot; + readonly targetProjects: ReadonlyArray<{ + readonly id: ProjectId; + readonly kind: ProjectKind; + readonly title: string; + readonly workspaceRoot: string; + readonly deletedAt: string | null; + }>; + readonly platform: string; +}): ProjectId | null { + const matchingTarget = input.targetProjects.find((targetProject) => { + if (targetProject.deletedAt !== null) { + return false; + } + if ( + !workspaceRootsEqual(targetProject.workspaceRoot, input.sourceProject.workspaceRoot, { + platform: input.platform, + }) + ) { + return false; + } + if (targetProject.kind === input.sourceProject.kind) { + return true; + } + return input.sourceProject.kind === "chat" && targetProject.title.trim() === "Home"; + }); + return matchingTarget?.id ?? null; +} + +const filterImportableAttachments = Effect.fn(function* (input: { + readonly attachments: ReadonlyArray; + readonly sourceAttachmentsDir: string; + readonly targetAttachmentsDir: string; + readonly counters: ImportCounters; +}) { + const fs = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + + const keptAttachments = yield* Effect.forEach( + input.attachments, + (attachment) => + Effect.gen(function* () { + const sourcePath = resolveAttachmentPath({ + attachmentsDir: input.sourceAttachmentsDir, + attachment, + }); + const targetPath = resolveAttachmentPath({ + attachmentsDir: input.targetAttachmentsDir, + attachment, + }); + if (!sourcePath || !targetPath) { + input.counters.missingAttachments += 1; + return null; + } + if (!(yield* fs.exists(sourcePath))) { + input.counters.missingAttachments += 1; + return null; + } + if (yield* fs.exists(targetPath)) { + input.counters.skippedAttachments += 1; + return attachment; + } + yield* fs.makeDirectory(path.dirname(targetPath), { recursive: true }); + yield* fs.copyFile(sourcePath, targetPath); + input.counters.copiedAttachments += 1; + return attachment; + }), + { concurrency: 1 }, + ); + + return keptAttachments.flatMap((attachment) => (attachment ? [attachment] : [])); +}); + +export const importLegacyT3State = Effect.fn(function* (input: { + readonly sourceBaseDir?: string; +}) { + const serverConfig = yield* ServerConfig; + const orchestrationEngine = yield* OrchestrationEngineService; + + const sourcePaths = yield* resolveSourcePaths({ + homeDir: serverConfig.homeDir, + devUrl: serverConfig.devUrl, + ...(input.sourceBaseDir !== undefined ? { sourceBaseDir: input.sourceBaseDir } : {}), + }); + + const sourceSnapshot = yield* Effect.try({ + try: () => readSourceSnapshot(sourcePaths.dbPath), + catch: (cause): LegacyStateImportError => + isLegacyStateImportError(cause) + ? cause + : new LegacyStateImportError({ + message: `Failed to read the legacy T3 database at '${sourcePaths.dbPath}'.`, + cause, + }), + }); + + const currentReadModel = yield* orchestrationEngine.getReadModel(); + const counters: ImportCounters = { + importedProjects: 0, + mappedProjects: 0, + skippedProjects: 0, + importedThreads: 0, + skippedThreads: 0, + importedMessages: 0, + importedActivities: 0, + importedProposedPlans: 0, + importedCheckpoints: 0, + copiedAttachments: 0, + skippedAttachments: 0, + missingAttachments: 0, + }; + + const targetProjects: Array<{ + readonly id: ProjectId; + readonly kind: ProjectKind; + readonly title: string; + readonly workspaceRoot: string; + readonly deletedAt: string | null; + }> = currentReadModel.projects.map((project) => ({ + id: project.id, + kind: project.kind ?? "project", + title: project.title, + workspaceRoot: project.workspaceRoot, + deletedAt: project.deletedAt, + })); + const existingProjectIds = new Set(currentReadModel.projects.map((project) => project.id)); + const existingThreadsById = new Map( + currentReadModel.threads.map((thread) => [thread.id, thread]), + ); + const targetProjectIdBySourceProjectId = new Map(); + + for (const sourceProject of sourceSnapshot.projects) { + if (sourceProject.deletedAt !== null) { + counters.skippedProjects += 1; + continue; + } + + const mappedProjectId = findMappedProjectId({ + sourceProject, + targetProjects, + platform: process.platform, + }); + if (mappedProjectId) { + counters.mappedProjects += 1; + targetProjectIdBySourceProjectId.set(sourceProject.id, mappedProjectId); + continue; + } + + if (existingProjectIds.has(sourceProject.id)) { + counters.skippedProjects += 1; + continue; + } + + yield* orchestrationEngine.dispatch({ + type: "project.create", + commandId: asCommandId(), + projectId: sourceProject.id, + kind: sourceProject.kind, + title: asTrimmedNonEmptyString(sourceProject.title), + workspaceRoot: asTrimmedNonEmptyString(sourceProject.workspaceRoot), + defaultModelSelection: sourceProject.defaultModelSelection, + createdAt: sourceProject.createdAt, + }); + + if (sourceProject.scripts.length > 0) { + yield* orchestrationEngine.dispatch({ + type: "project.meta.update", + commandId: asCommandId(), + projectId: sourceProject.id, + scripts: sourceProject.scripts, + }); + } + + existingProjectIds.add(sourceProject.id); + targetProjects.push({ + id: sourceProject.id, + kind: sourceProject.kind, + title: sourceProject.title, + workspaceRoot: sourceProject.workspaceRoot, + deletedAt: null, + }); + targetProjectIdBySourceProjectId.set(sourceProject.id, sourceProject.id); + counters.importedProjects += 1; + } + + const activeSourceThreads = sourceSnapshot.threads.filter((thread) => thread.deletedAt === null); + for (const sourceThread of activeSourceThreads) { + const targetProjectId = targetProjectIdBySourceProjectId.get(sourceThread.projectId); + if (!targetProjectId) { + counters.skippedThreads += 1; + continue; + } + const existingThread = existingThreadsById.get(sourceThread.id) ?? null; + if (existingThread === null) { + yield* orchestrationEngine.dispatch({ + type: "thread.create", + commandId: asCommandId(), + threadId: sourceThread.id, + projectId: targetProjectId, + title: asTrimmedNonEmptyString(sourceThread.title), + modelSelection: sourceThread.modelSelection, + runtimeMode: sourceThread.runtimeMode, + interactionMode: sourceThread.interactionMode, + envMode: sourceThread.envMode, + branch: asOptionalTrimmedNonEmptyString(sourceThread.branch), + worktreePath: asOptionalTrimmedNonEmptyString(sourceThread.worktreePath), + associatedWorktreePath: asOptionalTrimmedNonEmptyString( + sourceThread.associatedWorktreePath, + ), + associatedWorktreeBranch: asOptionalTrimmedNonEmptyString( + sourceThread.associatedWorktreeBranch, + ), + associatedWorktreeRef: asOptionalTrimmedNonEmptyString(sourceThread.associatedWorktreeRef), + parentThreadId: sourceThread.parentThreadId, + subagentAgentId: asOptionalTrimmedNonEmptyString(sourceThread.subagentAgentId), + subagentNickname: asOptionalTrimmedNonEmptyString(sourceThread.subagentNickname), + subagentRole: asOptionalTrimmedNonEmptyString(sourceThread.subagentRole), + lastKnownPr: sourceThread.lastKnownPr, + createdAt: sourceThread.createdAt, + }); + + if (sourceThread.handoff !== null) { + yield* orchestrationEngine.dispatch({ + type: "thread.meta.update", + commandId: asCommandId(), + threadId: sourceThread.id, + handoff: sourceThread.handoff, + }); + } + + counters.importedThreads += 1; + } else { + counters.skippedThreads += 1; + } + + const importedMessages = yield* Effect.forEach( + sourceThread.messages, + (message) => + Effect.gen(function* () { + const attachments = yield* filterImportableAttachments({ + attachments: message.attachments, + sourceAttachmentsDir: sourcePaths.attachmentsDir, + targetAttachmentsDir: serverConfig.attachmentsDir, + counters, + }); + return { + messageId: asMessageId(message.messageId), + role: message.role, + text: message.text, + turnId: message.turnId === null ? null : asTurnId(message.turnId), + streaming: message.streaming, + source: message.source, + ...(attachments.length > 0 ? { attachments } : {}), + ...(message.skills.length > 0 ? { skills: message.skills } : {}), + ...(message.mentions.length > 0 ? { mentions: message.mentions } : {}), + ...(message.dispatchMode !== null ? { dispatchMode: message.dispatchMode } : {}), + createdAt: message.createdAt, + updatedAt: message.updatedAt, + }; + }), + { concurrency: 1 }, + ); + + for (const messageChunk of chunkArray(importedMessages, 250)) { + const createdAt = + maxIso(messageChunk.map((message) => message.updatedAt)) ?? sourceThread.updatedAt; + yield* orchestrationEngine.dispatch({ + type: "thread.messages.import", + commandId: asCommandId(), + threadId: sourceThread.id, + messages: messageChunk, + createdAt, + }); + counters.importedMessages += messageChunk.length; + } + + const importedProposedPlans = sourceThread.proposedPlans.map((proposedPlan) => ({ + id: asProposedPlanId(proposedPlan.id), + turnId: proposedPlan.turnId === null ? null : asTurnId(proposedPlan.turnId), + planMarkdown: asTrimmedNonEmptyString(proposedPlan.planMarkdown), + implementedAt: proposedPlan.implementedAt, + implementationThreadId: proposedPlan.implementationThreadId, + createdAt: proposedPlan.createdAt, + updatedAt: proposedPlan.updatedAt, + })); + + for (const proposedPlanChunk of chunkArray(importedProposedPlans, 100)) { + const createdAt = + maxIso(proposedPlanChunk.map((proposedPlan) => proposedPlan.updatedAt)) ?? + sourceThread.updatedAt; + yield* orchestrationEngine.dispatch({ + type: "thread.proposed-plans.import", + commandId: asCommandId(), + threadId: sourceThread.id, + proposedPlans: proposedPlanChunk, + createdAt, + }); + counters.importedProposedPlans += proposedPlanChunk.length; + } + + const importedActivities = sourceThread.activities.map((activity) => ({ + id: asEventId(activity.id), + tone: activity.tone, + kind: asTrimmedNonEmptyString(activity.kind), + summary: asTrimmedNonEmptyString(activity.summary), + payload: activity.payload, + turnId: activity.turnId === null ? null : asTurnId(activity.turnId), + ...(activity.sequence !== null ? { sequence: asNonNegativeInt(activity.sequence) } : {}), + createdAt: activity.createdAt, + })); + + for (const activityChunk of chunkArray(importedActivities, 500)) { + const createdAt = + maxIso(activityChunk.map((activity) => activity.createdAt)) ?? sourceThread.updatedAt; + yield* orchestrationEngine.dispatch({ + type: "thread.activities.import", + commandId: asCommandId(), + threadId: sourceThread.id, + activities: activityChunk, + createdAt, + }); + counters.importedActivities += activityChunk.length; + } + + for (const checkpoint of sourceThread.checkpoints) { + yield* orchestrationEngine.dispatch({ + type: "thread.turn.diff.complete", + commandId: asCommandId(), + threadId: sourceThread.id, + turnId: asTurnId(checkpoint.turnId), + checkpointTurnCount: asNonNegativeInt(checkpoint.checkpointTurnCount), + checkpointRef: asCheckpointRef(checkpoint.checkpointRef), + status: checkpoint.status, + files: checkpoint.files, + ...(checkpoint.assistantMessageId !== null + ? { assistantMessageId: asMessageId(checkpoint.assistantMessageId) } + : {}), + completedAt: checkpoint.completedAt, + createdAt: checkpoint.completedAt, + }); + counters.importedCheckpoints += 1; + } + + if (sourceThread.archivedAt !== null && existingThread?.archivedAt == null) { + yield* orchestrationEngine.dispatch({ + type: "thread.archive", + commandId: asCommandId(), + threadId: sourceThread.id, + }); + } + } + + return { + sourceBaseDir: asTrimmedNonEmptyString(sourcePaths.baseDir), + sourceStateDir: asTrimmedNonEmptyString(sourcePaths.stateDir), + ...counters, + } satisfies typeof OrchestrationImportLegacyT3StateResult.Type; +}); diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 3656e576..3c2c2059 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -18,7 +18,6 @@ import { type RuntimeMode, type ServerConfigShape, } from "./config"; -import { migrateLegacyHomeIfNeeded } from "./homeMigration"; import { fixPath, resolveBaseDir } from "./os-jank"; import { Open } from "./open"; import * as SqlitePersistence from "./persistence/Layers/Sqlite"; @@ -155,20 +154,6 @@ const ServerConfigLive = (input: CliInput) => const devUrl = Option.getOrElse(input.devUrl, () => env.devUrl); const baseDir = yield* resolveBaseDir(Option.getOrUndefined(input.t3Home) ?? env.t3Home); - // Import legacy ~/.t3 state before runtime paths are derived under ~/.dpcode. - yield* migrateLegacyHomeIfNeeded({ - baseDir, - homeDir: OS.homedir(), - devUrl, - }).pipe( - Effect.mapError( - (cause) => - new StartupError({ - message: "Failed to migrate legacy T3 home directory", - cause, - }), - ), - ); const derivedPaths = yield* deriveServerPaths(baseDir, devUrl); const noBrowser = resolveBooleanFlag(input.noBrowser, env.noBrowser ?? mode === "desktop"); const authToken = Option.getOrUndefined(input.authToken) ?? env.authToken; diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index e9ff8436..344f934c 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -2,6 +2,8 @@ import { ApprovalRequestId, type ChatAttachment, type OrchestrationEvent, + type ThreadId, + type TurnId, } from "@t3tools/contracts"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect"; @@ -131,6 +133,67 @@ function extractActivityRequestId(payload: unknown): ApprovalRequestId | null { return typeof requestId === "string" ? ApprovalRequestId.makeUnsafe(requestId) : null; } +const applyPendingApprovalActivityProjection = Effect.fn(function* (input: { + readonly projectionPendingApprovalRepository: ProjectionPendingApprovalRepositoryShape; + readonly threadId: ThreadId; + readonly activity: { + readonly turnId: TurnId | null; + readonly kind: string; + readonly payload: unknown; + readonly createdAt: string; + }; + readonly fallbackRequestId?: ApprovalRequestId | null; +}) { + const requestId = + extractActivityRequestId(input.activity.payload) ?? input.fallbackRequestId ?? null; + if (requestId === null) { + return; + } + + const existingRow = yield* input.projectionPendingApprovalRepository.getByRequestId({ + requestId, + }); + if (input.activity.kind === "approval.resolved") { + const resolvedDecisionRaw = + typeof input.activity.payload === "object" && + input.activity.payload !== null && + "decision" in input.activity.payload + ? (input.activity.payload as { decision?: unknown }).decision + : null; + const resolvedDecision = + resolvedDecisionRaw === "accept" || + resolvedDecisionRaw === "acceptForSession" || + resolvedDecisionRaw === "decline" || + resolvedDecisionRaw === "cancel" + ? resolvedDecisionRaw + : null; + yield* input.projectionPendingApprovalRepository.upsert({ + requestId, + threadId: Option.isSome(existingRow) ? existingRow.value.threadId : input.threadId, + turnId: Option.isSome(existingRow) ? existingRow.value.turnId : input.activity.turnId, + status: "resolved", + decision: resolvedDecision, + createdAt: Option.isSome(existingRow) + ? existingRow.value.createdAt + : input.activity.createdAt, + resolvedAt: input.activity.createdAt, + }); + return; + } + if (Option.isSome(existingRow) && existingRow.value.status === "resolved") { + return; + } + yield* input.projectionPendingApprovalRepository.upsert({ + requestId, + threadId: input.threadId, + turnId: input.activity.turnId, + status: "pending", + decision: null, + createdAt: Option.isSome(existingRow) ? existingRow.value.createdAt : input.activity.createdAt, + resolvedAt: null, + }); +}); + // Recompute the denormalized sidebar shell summary after per-thread timeline changes. const withRefreshedThreadShellSummary = Effect.fn(function* (input: { readonly thread: ProjectionThread; @@ -513,6 +576,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } yield* projectionThreadRepository.upsert({ ...existingRow.value, + ...(event.payload.projectId !== undefined + ? { projectId: event.payload.projectId } + : {}), ...(event.payload.title !== undefined ? { title: event.payload.title } : {}), ...(event.payload.modelSelection !== undefined ? { modelSelection: event.payload.modelSelection } @@ -631,8 +697,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } case "thread.message-sent": + case "thread.messages-imported": case "thread.proposed-plan-upserted": + case "thread.proposed-plans-imported": case "thread.activity-appended": + case "thread.activities-imported": case "thread.approval-response-requested": case "thread.reverted": { const existingRow = yield* projectionThreadRepository.getById({ @@ -751,6 +820,31 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { return; } + case "thread.messages-imported": + yield* Effect.forEach( + event.payload.messages, + (message) => + projectionThreadMessageRepository.upsert({ + messageId: message.messageId, + threadId: event.payload.threadId, + turnId: message.turnId ?? null, + role: message.role, + text: message.text, + ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + ...(message.skills !== undefined ? { skills: message.skills } : {}), + ...(message.mentions !== undefined ? { mentions: message.mentions } : {}), + ...(message.dispatchMode !== undefined + ? { dispatchMode: message.dispatchMode } + : {}), + isStreaming: message.streaming ?? false, + source: message.source ?? "native", + createdAt: message.createdAt, + updatedAt: message.updatedAt, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + return; + case "thread.reverted": { const existingRows = yield* projectionThreadMessageRepository.listByThreadId({ threadId: event.payload.threadId, @@ -808,6 +902,24 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { }); return; + case "thread.proposed-plans-imported": + yield* Effect.forEach( + event.payload.proposedPlans, + (proposedPlan) => + projectionThreadProposedPlanRepository.upsert({ + planId: proposedPlan.id, + threadId: event.payload.threadId, + turnId: proposedPlan.turnId, + planMarkdown: proposedPlan.planMarkdown, + implementedAt: proposedPlan.implementedAt, + implementationThreadId: proposedPlan.implementationThreadId, + createdAt: proposedPlan.createdAt, + updatedAt: proposedPlan.updatedAt, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + return; + case "thread.reverted": { const existingRows = yield* projectionThreadProposedPlanRepository.listByThreadId({ threadId: event.payload.threadId, @@ -864,6 +976,25 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { }); return; + case "thread.activities-imported": + yield* Effect.forEach( + event.payload.activities, + (activity) => + projectionThreadActivityRepository.upsert({ + activityId: activity.id, + threadId: event.payload.threadId, + turnId: activity.turnId, + tone: activity.tone, + kind: activity.kind, + summary: activity.summary, + payload: activity.payload, + ...(activity.sequence !== undefined ? { sequence: activity.sequence } : {}), + createdAt: activity.createdAt, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + return; + case "thread.reverted": { const existingRows = yield* projectionThreadActivityRepository.listByThreadId({ threadId: event.payload.threadId, @@ -1225,64 +1356,29 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { Effect.gen(function* () { switch (event.type) { case "thread.activity-appended": { - const requestId = - extractActivityRequestId(event.payload.activity.payload) ?? - event.metadata.requestId ?? - null; - if (requestId === null) { - return; - } - const existingRow = yield* projectionPendingApprovalRepository.getByRequestId({ - requestId, - }); - if (event.payload.activity.kind === "approval.resolved") { - const resolvedDecisionRaw = - typeof event.payload.activity.payload === "object" && - event.payload.activity.payload !== null && - "decision" in event.payload.activity.payload - ? (event.payload.activity.payload as { decision?: unknown }).decision - : null; - const resolvedDecision = - resolvedDecisionRaw === "accept" || - resolvedDecisionRaw === "acceptForSession" || - resolvedDecisionRaw === "decline" || - resolvedDecisionRaw === "cancel" - ? resolvedDecisionRaw - : null; - yield* projectionPendingApprovalRepository.upsert({ - requestId, - threadId: Option.isSome(existingRow) - ? existingRow.value.threadId - : event.payload.threadId, - turnId: Option.isSome(existingRow) - ? existingRow.value.turnId - : event.payload.activity.turnId, - status: "resolved", - decision: resolvedDecision, - createdAt: Option.isSome(existingRow) - ? existingRow.value.createdAt - : event.payload.activity.createdAt, - resolvedAt: event.payload.activity.createdAt, - }); - return; - } - if (Option.isSome(existingRow) && existingRow.value.status === "resolved") { - return; - } - yield* projectionPendingApprovalRepository.upsert({ - requestId, + yield* applyPendingApprovalActivityProjection({ + projectionPendingApprovalRepository, threadId: event.payload.threadId, - turnId: event.payload.activity.turnId, - status: "pending", - decision: null, - createdAt: Option.isSome(existingRow) - ? existingRow.value.createdAt - : event.payload.activity.createdAt, - resolvedAt: null, + activity: event.payload.activity, + fallbackRequestId: event.metadata.requestId ?? null, }); return; } + case "thread.activities-imported": { + yield* Effect.forEach( + event.payload.activities, + (activity) => + applyPendingApprovalActivityProjection({ + projectionPendingApprovalRepository, + threadId: event.payload.threadId, + activity, + }), + { concurrency: 1 }, + ).pipe(Effect.asVoid); + return; + } + case "thread.approval-response-requested": { const existingRow = yield* projectionPendingApprovalRepository.getByRequestId({ requestId: event.payload.requestId, diff --git a/apps/server/src/orchestration/Schemas.ts b/apps/server/src/orchestration/Schemas.ts index f7ebf693..1bb01e77 100644 --- a/apps/server/src/orchestration/Schemas.ts +++ b/apps/server/src/orchestration/Schemas.ts @@ -12,6 +12,9 @@ import { ThreadMessageSentPayload as ContractsThreadMessageSentPayloadSchema, ThreadProposedPlanUpsertedPayload as ContractsThreadProposedPlanUpsertedPayloadSchema, ThreadSessionSetPayload as ContractsThreadSessionSetPayloadSchema, + ThreadMessagesImportedPayload as ContractsThreadMessagesImportedPayloadSchema, + ThreadProposedPlansImportedPayload as ContractsThreadProposedPlansImportedPayloadSchema, + ThreadActivitiesImportedPayload as ContractsThreadActivitiesImportedPayloadSchema, ThreadTurnDiffCompletedPayload as ContractsThreadTurnDiffCompletedPayloadSchema, ThreadRevertedPayload as ContractsThreadRevertedPayloadSchema, ThreadActivityAppendedPayload as ContractsThreadActivityAppendedPayloadSchema, @@ -41,6 +44,9 @@ export const ThreadSessionSetPayload = ContractsThreadSessionSetPayloadSchema; export const ThreadTurnDiffCompletedPayload = ContractsThreadTurnDiffCompletedPayloadSchema; export const ThreadRevertedPayload = ContractsThreadRevertedPayloadSchema; export const ThreadActivityAppendedPayload = ContractsThreadActivityAppendedPayloadSchema; +export const ThreadMessagesImportedPayload = ContractsThreadMessagesImportedPayloadSchema; +export const ThreadProposedPlansImportedPayload = ContractsThreadProposedPlansImportedPayloadSchema; +export const ThreadActivitiesImportedPayload = ContractsThreadActivitiesImportedPayloadSchema; export const ThreadTurnStartRequestedPayload = ContractsThreadTurnStartRequestedPayloadSchema; export const ThreadTurnInterruptRequestedPayload = diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index eb7a0a48..092450ed 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -13,6 +13,7 @@ import { OrchestrationCommandInvariantError } from "./Errors.ts"; import { hasNativeHandoffMessages } from "./handoff.ts"; import { requireProject, + requireProjectActive, requireProjectAbsent, requireProjectHasNoThreads, requireProjectWorkspaceRootAvailable, @@ -543,6 +544,13 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); + if (command.projectId !== undefined) { + yield* requireProjectActive({ + readModel, + command, + projectId: command.projectId, + }); + } const occurredAt = nowIso(); return { ...withEventBase({ @@ -554,6 +562,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" type: "thread.meta-updated", payload: { threadId: command.threadId, + ...(command.projectId !== undefined ? { projectId: command.projectId } : {}), ...(command.title !== undefined ? { title: command.title } : {}), ...(command.modelSelection !== undefined ? { modelSelection: command.modelSelection } @@ -929,27 +938,61 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" command, threadId: command.threadId, }); - return command.messages.map((message) => ({ + return { ...withEventBase({ aggregateKind: "thread", aggregateId: command.threadId, occurredAt: command.createdAt, commandId: command.commandId, }), - type: "thread.message-sent" as const, + type: "thread.messages-imported" as const, payload: { threadId: command.threadId, - messageId: message.messageId, - role: message.role, - text: message.text, - ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), - turnId: null, - streaming: false, - source: "native" as const, - createdAt: message.createdAt, - updatedAt: message.updatedAt, + messages: command.messages, }, - })); + }; + } + + case "thread.proposed-plans.import": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.proposed-plans-imported", + payload: { + threadId: command.threadId, + proposedPlans: command.proposedPlans, + }, + }; + } + + case "thread.activities.import": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.activities-imported", + payload: { + threadId: command.threadId, + activities: command.activities, + }, + }; } case "thread.message.assistant.delta": { diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 106d9ac5..6b505aa4 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -10,6 +10,7 @@ import { Effect, Schema } from "effect"; import { toProjectorDecodeError, type OrchestrationProjectorDecodeError } from "./Errors.ts"; import { MessageSentPayloadSchema, + ThreadActivitiesImportedPayload, ProjectCreatedPayload, ProjectDeletedPayload, ProjectMetaUpdatedPayload, @@ -18,7 +19,9 @@ import { ThreadCreatedPayload, ThreadDeletedPayload, ThreadInteractionModeSetPayload, + ThreadMessagesImportedPayload, ThreadMetaUpdatedPayload, + ThreadProposedPlansImportedPayload, ThreadProposedPlanUpsertedPayload, ThreadRuntimeModeSetPayload, ThreadUnarchivedPayload, @@ -27,7 +30,7 @@ import { ThreadTurnDiffCompletedPayload, } from "./Schemas.ts"; -type ThreadPatch = Partial>; +type ThreadPatch = Partial>; const MAX_THREAD_MESSAGES = 2_000; const MAX_THREAD_CHECKPOINTS = 500; @@ -164,6 +167,39 @@ function compareThreadActivities( return left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id); } +function maxIso(left: string, right: string): string { + return left > right ? left : right; +} + +function upsertThreadMessage( + messages: ReadonlyArray, + message: OrchestrationMessage, +): ReadonlyArray { + const existingMessage = messages.find((entry) => entry.id === message.id); + const nextMessages = existingMessage + ? messages.map((entry) => + entry.id === message.id + ? { + ...entry, + text: message.streaming + ? `${entry.text}${message.text}` + : message.text.length > 0 + ? message.text + : entry.text, + streaming: message.streaming, + source: message.source, + updatedAt: message.updatedAt, + turnId: message.turnId, + ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + ...(message.skills !== undefined ? { skills: message.skills } : {}), + ...(message.mentions !== undefined ? { mentions: message.mentions } : {}), + } + : entry, + ) + : [...messages, message]; + return nextMessages.slice(-MAX_THREAD_MESSAGES); +} + export function createEmptyReadModel(nowIso: string): OrchestrationReadModel { return { snapshotSequence: 0, @@ -347,6 +383,7 @@ export function projectEvent( Effect.map((payload) => ({ ...nextBase, threads: updateThread(nextBase.threads, payload.threadId, { + ...(payload.projectId !== undefined ? { projectId: payload.projectId } : {}), ...(payload.title !== undefined ? { title: payload.title } : {}), ...(payload.modelSelection !== undefined ? { modelSelection: payload.modelSelection } @@ -438,38 +475,64 @@ export function projectEvent( event.type, "message", ); - - const existingMessage = thread.messages.find((entry) => entry.id === message.id); - const messages = existingMessage - ? thread.messages.map((entry) => - entry.id === message.id - ? { - ...entry, - text: message.streaming - ? `${entry.text}${message.text}` - : message.text.length > 0 - ? message.text - : entry.text, - streaming: message.streaming, - source: message.source, - updatedAt: message.updatedAt, - turnId: message.turnId, - ...(message.attachments !== undefined - ? { attachments: message.attachments } - : {}), - ...(message.skills !== undefined ? { skills: message.skills } : {}), - ...(message.mentions !== undefined ? { mentions: message.mentions } : {}), - } - : entry, - ) - : [...thread.messages, message]; - const cappedMessages = messages.slice(-MAX_THREAD_MESSAGES); + const cappedMessages = upsertThreadMessage(thread.messages, message); return { ...nextBase, threads: updateThread(nextBase.threads, payload.threadId, { messages: cappedMessages, - updatedAt: event.occurredAt, + updatedAt: maxIso(thread.updatedAt, message.updatedAt), + }), + }; + }); + + case "thread.messages-imported": + return Effect.gen(function* () { + const payload = yield* decodeForEvent( + ThreadMessagesImportedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + let nextMessages = thread.messages; + let nextUpdatedAt = thread.updatedAt; + for (const importedMessage of payload.messages) { + const message: OrchestrationMessage = yield* decodeForEvent( + OrchestrationMessage, + { + id: importedMessage.messageId, + role: importedMessage.role, + text: importedMessage.text, + ...(importedMessage.attachments !== undefined + ? { attachments: importedMessage.attachments } + : {}), + ...(importedMessage.skills !== undefined ? { skills: importedMessage.skills } : {}), + ...(importedMessage.mentions !== undefined + ? { mentions: importedMessage.mentions } + : {}), + turnId: importedMessage.turnId, + streaming: importedMessage.streaming, + source: importedMessage.source, + createdAt: importedMessage.createdAt, + updatedAt: importedMessage.updatedAt, + }, + event.type, + "message", + ); + nextMessages = upsertThreadMessage(nextMessages, message); + nextUpdatedAt = maxIso(nextUpdatedAt, message.updatedAt); + } + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + messages: nextMessages, + updatedAt: nextUpdatedAt, }), }; }); @@ -558,6 +621,46 @@ export function projectEvent( }; }); + case "thread.proposed-plans-imported": + return Effect.gen(function* () { + const payload = yield* decodeForEvent( + ThreadProposedPlansImportedPayload, + event.payload, + event.type, + "payload", + ); + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const proposedPlans = [ + ...thread.proposedPlans.filter( + (existingPlan) => + !payload.proposedPlans.some((incomingPlan) => incomingPlan.id === existingPlan.id), + ), + ...payload.proposedPlans, + ] + .toSorted( + (left, right) => + left.createdAt.localeCompare(right.createdAt) || left.id.localeCompare(right.id), + ) + .slice(-200); + + const updatedAt = payload.proposedPlans.reduce( + (latest, plan) => maxIso(latest, plan.updatedAt), + thread.updatedAt, + ); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + proposedPlans, + updatedAt, + }), + }; + }); + case "thread.turn-diff-completed": return Effect.gen(function* () { const payload = yield* decodeForEvent( @@ -707,6 +810,46 @@ export function projectEvent( }), ); + case "thread.activities-imported": + return decodeForEvent( + ThreadActivitiesImportedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const activities = [ + ...thread.activities.filter( + (existingActivity) => + !payload.activities.some( + (incomingActivity) => incomingActivity.id === existingActivity.id, + ), + ), + ...payload.activities, + ] + .toSorted(compareThreadActivities) + .slice(-500); + + const updatedAt = payload.activities.reduce( + (latest, activity) => maxIso(latest, activity.createdAt), + thread.updatedAt, + ); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + activities, + updatedAt, + }), + }; + }), + ); + default: return Effect.succeed(nextBase); } diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index 5a5c9e52..36ac3114 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -43,6 +43,7 @@ import Migration0027 from "./Migrations/027_BackfillProjectionThreadShellSummary import Migration0028 from "./Migrations/028_ProjectionProjectsKind.ts"; import Migration0029 from "./Migrations/029_ProjectionThreadsLastKnownPr.ts"; import Migration0030 from "./Migrations/030_ProjectionThreadMessagesDispatchMode.ts"; +import Migration0031 from "./Migrations/031_RepairLegacyProjectionSchema.ts"; /** * Migration loader with all migrations defined inline. @@ -85,6 +86,7 @@ export const migrationEntries = [ [28, "ProjectionProjectsKind", Migration0028], [29, "ProjectionThreadsLastKnownPr", Migration0029], [30, "ProjectionThreadMessagesDispatchMode", Migration0030], + [31, "RepairLegacyProjectionSchema", Migration0031], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.test.ts b/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.test.ts new file mode 100644 index 00000000..3fcb8d43 --- /dev/null +++ b/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.test.ts @@ -0,0 +1,180 @@ +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +layer("031_RepairLegacyProjectionSchema", (it) => { + it.effect( + "repairs projection columns when legacy desktop builds already consumed ids 17-30", + () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 16 }); + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + default_model_selection_json, + scripts_json, + created_at, + updated_at, + deleted_at + ) VALUES ( + 'project-1', + 'Project', + '/repo/project', + '{"provider":"codex","model":"gpt-5.4"}', + '[]', + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + deleted_at + ) VALUES ( + 'thread-1', + 'project-1', + 'Thread', + '{"provider":"codex","model":"gpt-5.4"}', + 'full-access', + 'default', + 'feature/test', + '/repo/project/.worktrees/feature-test', + NULL, + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + is_streaming, + created_at, + updated_at, + attachments_json + ) VALUES ( + 'message-1', + 'thread-1', + NULL, + 'user', + 'Hello', + 0, + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO effect_sql_migrations (migration_id, name) + VALUES + (17, 'ProjectionThreadsArchivedAt'), + (18, 'ProjectionThreadsArchivedAtIndex'), + (19, 'ProjectionSnapshotLookupIndexes'), + (20, 'AuthAccessManagement'), + (21, 'AuthSessionClientMetadata'), + (22, 'AuthSessionLastConnectedAt'), + (23, 'ProjectionThreadShellSummary'), + (24, 'BackfillProjectionThreadShellSummary'), + (25, 'ProjectionThreadsSubagents'), + (26, 'ProjectionThreadShellSummary'), + (27, 'BackfillProjectionThreadShellSummary'), + (28, 'ProjectionProjectsKind'), + (29, 'ProjectionThreadsLastKnownPr'), + (30, 'ProjectionThreadMessagesDispatchMode') + `; + + yield* runMigrations({ toMigrationInclusive: 31 }); + + const repairedThreadRows = yield* sql<{ + readonly envMode: string | null; + readonly associatedWorktreePath: string | null; + readonly associatedWorktreeBranch: string | null; + readonly associatedWorktreeRef: string | null; + readonly handoffJson: string | null; + readonly forkSourceThreadId: string | null; + readonly pendingApprovalCount: number | null; + readonly pendingUserInputCount: number | null; + readonly hasActionableProposedPlan: number | null; + }>` + SELECT + env_mode AS "envMode", + associated_worktree_path AS "associatedWorktreePath", + associated_worktree_branch AS "associatedWorktreeBranch", + associated_worktree_ref AS "associatedWorktreeRef", + handoff_json AS "handoffJson", + fork_source_thread_id AS "forkSourceThreadId", + pending_approval_count AS "pendingApprovalCount", + pending_user_input_count AS "pendingUserInputCount", + has_actionable_proposed_plan AS "hasActionableProposedPlan" + FROM projection_threads + WHERE thread_id = 'thread-1' + `; + + assert.deepStrictEqual(repairedThreadRows, [ + { + envMode: "worktree", + associatedWorktreePath: "/repo/project/.worktrees/feature-test", + associatedWorktreeBranch: "feature/test", + associatedWorktreeRef: "feature/test", + handoffJson: null, + forkSourceThreadId: null, + pendingApprovalCount: 0, + pendingUserInputCount: 0, + hasActionableProposedPlan: 0, + }, + ]); + + const repairedMessageRows = yield* sql<{ + readonly source: string | null; + readonly skillsJson: string | null; + readonly mentionsJson: string | null; + readonly dispatchMode: string | null; + }>` + SELECT + source, + skills_json AS "skillsJson", + mentions_json AS "mentionsJson", + dispatch_mode AS "dispatchMode" + FROM projection_thread_messages + WHERE message_id = 'message-1' + `; + + assert.deepStrictEqual(repairedMessageRows, [ + { + source: "native", + skillsJson: null, + mentionsJson: null, + dispatchMode: null, + }, + ]); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.ts b/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.ts new file mode 100644 index 00000000..ac90ad05 --- /dev/null +++ b/apps/server/src/persistence/Migrations/031_RepairLegacyProjectionSchema.ts @@ -0,0 +1,148 @@ +// FILE: 031_RepairLegacyProjectionSchema.ts +// Purpose: Repairs projection schema drift for legacy desktop databases that already consumed +// later migration ids with a different migration list. +// Layer: Persistence migration + +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const projectionThreadsColumnExists = (columnName: string) => + sql<{ readonly exists: number }>` + SELECT EXISTS( + SELECT 1 + FROM pragma_table_info('projection_threads') + WHERE name = ${columnName} + ) AS "exists" + `.pipe(Effect.map(([row]) => row?.exists === 1)); + + const projectionThreadMessagesColumnExists = (columnName: string) => + sql<{ readonly exists: number }>` + SELECT EXISTS( + SELECT 1 + FROM pragma_table_info('projection_thread_messages') + WHERE name = ${columnName} + ) AS "exists" + `.pipe(Effect.map(([row]) => row?.exists === 1)); + + const ensureProjectionThreadsColumn = (columnName: string, definition: string) => + Effect.gen(function* () { + if (yield* projectionThreadsColumnExists(columnName)) { + return false; + } + yield* sql.unsafe(` + ALTER TABLE projection_threads + ADD COLUMN ${definition} + `); + return true; + }); + + const ensureProjectionThreadMessagesColumn = (columnName: string, definition: string) => + Effect.gen(function* () { + if (yield* projectionThreadMessagesColumnExists(columnName)) { + return false; + } + yield* sql.unsafe(` + ALTER TABLE projection_thread_messages + ADD COLUMN ${definition} + `); + return true; + }); + + yield* ensureProjectionThreadsColumn("handoff_json", "handoff_json TEXT"); + yield* ensureProjectionThreadsColumn("env_mode", "env_mode TEXT NOT NULL DEFAULT 'local'"); + yield* ensureProjectionThreadsColumn("fork_source_thread_id", "fork_source_thread_id TEXT"); + yield* ensureProjectionThreadsColumn("associated_worktree_path", "associated_worktree_path TEXT"); + yield* ensureProjectionThreadsColumn( + "associated_worktree_branch", + "associated_worktree_branch TEXT", + ); + yield* ensureProjectionThreadsColumn("associated_worktree_ref", "associated_worktree_ref TEXT"); + yield* ensureProjectionThreadsColumn("archived_at", "archived_at TEXT"); + yield* ensureProjectionThreadsColumn("parent_thread_id", "parent_thread_id TEXT"); + yield* ensureProjectionThreadsColumn("subagent_agent_id", "subagent_agent_id TEXT"); + yield* ensureProjectionThreadsColumn("subagent_nickname", "subagent_nickname TEXT"); + yield* ensureProjectionThreadsColumn("subagent_role", "subagent_role TEXT"); + yield* ensureProjectionThreadsColumn("latest_user_message_at", "latest_user_message_at TEXT"); + yield* ensureProjectionThreadsColumn( + "pending_approval_count", + "pending_approval_count INTEGER NOT NULL DEFAULT 0", + ); + yield* ensureProjectionThreadsColumn( + "pending_user_input_count", + "pending_user_input_count INTEGER NOT NULL DEFAULT 0", + ); + yield* ensureProjectionThreadsColumn( + "has_actionable_proposed_plan", + "has_actionable_proposed_plan INTEGER NOT NULL DEFAULT 0", + ); + yield* ensureProjectionThreadsColumn("last_known_pr_json", "last_known_pr_json TEXT"); + + yield* ensureProjectionThreadMessagesColumn("source", "source TEXT NOT NULL DEFAULT 'native'"); + yield* ensureProjectionThreadMessagesColumn("skills_json", "skills_json TEXT"); + yield* ensureProjectionThreadMessagesColumn("mentions_json", "mentions_json TEXT"); + yield* ensureProjectionThreadMessagesColumn("dispatch_mode", "dispatch_mode TEXT"); + + yield* sql` + UPDATE projection_threads + SET env_mode = CASE + WHEN worktree_path IS NOT NULL THEN 'worktree' + ELSE 'local' + END + WHERE env_mode IS NULL + OR env_mode NOT IN ('local', 'worktree') + `; + + yield* sql` + UPDATE projection_threads + SET associated_worktree_path = worktree_path + WHERE associated_worktree_path IS NULL + AND worktree_path IS NOT NULL + `; + + yield* sql` + UPDATE projection_threads + SET associated_worktree_branch = branch + WHERE associated_worktree_branch IS NULL + AND branch IS NOT NULL + `; + + yield* sql` + UPDATE projection_threads + SET associated_worktree_ref = COALESCE(associated_worktree_branch, branch) + WHERE associated_worktree_ref IS NULL + AND COALESCE(associated_worktree_branch, branch) IS NOT NULL + `; + + yield* sql` + UPDATE projection_threads + SET pending_approval_count = 0 + WHERE pending_approval_count IS NULL + `; + + yield* sql` + UPDATE projection_threads + SET pending_user_input_count = 0 + WHERE pending_user_input_count IS NULL + `; + + yield* sql` + UPDATE projection_threads + SET has_actionable_proposed_plan = 0 + WHERE has_actionable_proposed_plan IS NULL + `; + + yield* sql` + UPDATE projection_thread_messages + SET source = 'native' + WHERE source IS NULL + OR TRIM(source) = '' + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_threads_parent_thread_id + ON projection_threads(parent_thread_id) + `; +}); diff --git a/apps/server/src/providerUsageSnapshot.ts b/apps/server/src/providerUsageSnapshot.ts new file mode 100644 index 00000000..82db131d --- /dev/null +++ b/apps/server/src/providerUsageSnapshot.ts @@ -0,0 +1,609 @@ +import type { Dirent, Stats } from "node:fs"; +import fs from "node:fs/promises"; +import nodePath from "node:path"; + +import type { + ProviderKind, + ServerGetProviderUsageSnapshotInput, + ServerGetProviderUsageSnapshotResult, + ServerProviderUsageLimit, + ServerProviderUsageLine, +} from "@t3tools/contracts"; +import { Effect } from "effect"; + +import { ServerConfig } from "./config"; + +const LOOKBACK_DAYS = 7; +const ONE_DAY_MS = 24 * 60 * 60 * 1000; +const LOOKBACK_MS = LOOKBACK_DAYS * ONE_DAY_MS; +const USAGE_CACHE_TTL_MS = 30_000; +const MAX_RECENT_USAGE_FILES = 200; + +type UsageSnapshot = Exclude; + +interface CachedUsageSnapshot { + expiresAtMs: number; + value: ServerGetProviderUsageSnapshotResult; + pending: Promise | null; +} + +interface CodexSessionSummary { + timestampMs: number; + totalTokens: number; + limits: ReadonlyArray; +} + +interface ClaudeUsageSample { + sessionId: string; + timestampMs: number; + totalTokens: number; + model: string | null; +} + +const usageSnapshotCache = new Map(); + +function asRecord(value: unknown): Record | null { + return value && typeof value === "object" ? (value as Record) : null; +} + +function asFiniteNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function asNonNegativeNumber(value: unknown): number | undefined { + const parsed = asFiniteNumber(value); + return parsed !== undefined && parsed >= 0 ? parsed : undefined; +} + +function asString(value: unknown): string | undefined { + return typeof value === "string" && value.trim().length > 0 ? value.trim() : undefined; +} + +function parseTimestampMs(value: unknown): number | null { + if (typeof value !== "string" || value.trim().length === 0) { + return null; + } + const timestampMs = Date.parse(value); + return Number.isFinite(timestampMs) ? timestampMs : null; +} + +function toIsoString(timestampMs: number): string { + return new Date(timestampMs).toISOString(); +} + +function formatCompactNumber(value: number): string { + const absoluteValue = Math.abs(value); + if (absoluteValue < 1_000) { + return new Intl.NumberFormat(undefined, { maximumFractionDigits: 0 }).format(value); + } + return new Intl.NumberFormat(undefined, { + notation: "compact", + maximumFractionDigits: absoluteValue < 1_000_000 ? 1 : 0, + }).format(value); +} + +function formatTokenValue(tokens: number): string { + return `${formatCompactNumber(tokens)} tokens`; +} + +function formatRecentSessionsSubtitle(sessionCount: number): string | undefined { + if (sessionCount <= 0) { + return undefined; + } + return `${new Intl.NumberFormat(undefined).format(sessionCount)} recent ${sessionCount === 1 ? "session" : "sessions"}`; +} + +function formatUsageTimestamp(timestampMs: number): string { + return new Intl.DateTimeFormat(undefined, { + day: "numeric", + hour: "2-digit", + minute: "2-digit", + month: "short", + }).format(timestampMs); +} + +async function safeReadDir(path: string): Promise> { + try { + return await fs.readdir(path, { withFileTypes: true }); + } catch { + return []; + } +} + +async function safeStat(path: string): Promise { + try { + return await fs.stat(path); + } catch { + return null; + } +} + +async function listRecentFiles(paths: ReadonlyArray): Promise> { + const filesWithStats = await Promise.all( + paths.map(async (path) => ({ + path, + mtimeMs: (await safeStat(path))?.mtimeMs ?? 0, + })), + ); + + return filesWithStats + .sort((left, right) => right.mtimeMs - left.mtimeMs) + .slice(0, MAX_RECENT_USAGE_FILES) + .map((entry) => entry.path); +} + +function buildUsageLines(input: { + tokens24h: number; + tokens7d: number; + sessions24h: number; + sessions7d: number; + latestTimestampMs: number; + latestSubtitle?: string; +}): ReadonlyArray { + return [ + { + label: "24h", + value: formatTokenValue(input.tokens24h), + ...(formatRecentSessionsSubtitle(input.sessions24h) + ? { subtitle: formatRecentSessionsSubtitle(input.sessions24h) } + : {}), + }, + { + label: "7d", + value: formatTokenValue(input.tokens7d), + ...(formatRecentSessionsSubtitle(input.sessions7d) + ? { subtitle: formatRecentSessionsSubtitle(input.sessions7d) } + : {}), + }, + { + label: "Latest", + value: formatUsageTimestamp(input.latestTimestampMs), + ...(input.latestSubtitle ? { subtitle: input.latestSubtitle } : {}), + }, + ]; +} + +function normalizeCodexUsageLimits(value: unknown): ReadonlyArray { + const rateLimits = asRecord(value); + if (!rateLimits) { + return []; + } + + const parseLimit = ( + label: string, + source: Record | null, + ): ServerProviderUsageLimit | null => { + if (!source) { + return null; + } + + const usedPercent = asNonNegativeNumber(source.used_percent ?? source.usedPercent); + const windowDurationMins = asNonNegativeNumber(source.window_minutes ?? source.windowMinutes); + const resetsAt = + asString(source.resets_at ?? source.resetsAt) ?? + asString(source.next_reset_at ?? source.nextResetAt); + if (usedPercent === undefined && windowDurationMins === undefined && !resetsAt) { + return null; + } + + return { + window: label, + ...(usedPercent !== undefined ? { usedPercent } : {}), + ...(windowDurationMins !== undefined ? { windowDurationMins } : {}), + ...(resetsAt ? { resetsAt } : {}), + }; + }; + + const primary = parseLimit("5h", asRecord(rateLimits.primary)); + const secondary = parseLimit("Weekly", asRecord(rateLimits.secondary)); + + return [primary, secondary].filter((limit): limit is ServerProviderUsageLimit => limit !== null); +} + +function readCodexTotalTokens(payload: Record): number { + const info = asRecord(payload.info); + const totalUsage = + asRecord(info?.total_token_usage) ?? + asRecord(info?.totalTokenUsage) ?? + asRecord(info?.total) ?? + asRecord(payload.total_token_usage) ?? + asRecord(payload.totalTokenUsage) ?? + asRecord(payload.total); + + return ( + asNonNegativeNumber(totalUsage?.total_tokens) ?? + asNonNegativeNumber(totalUsage?.totalTokens) ?? + asNonNegativeNumber(info?.total_tokens) ?? + asNonNegativeNumber(info?.totalTokens) ?? + asNonNegativeNumber(payload.total_tokens) ?? + asNonNegativeNumber(payload.totalTokens) ?? + 0 + ); +} + +async function listRecentCodexSessionFiles(sessionsRoot: string): Promise> { + const now = new Date(); + const candidates: string[] = []; + + for (let offset = 0; offset <= LOOKBACK_DAYS; offset += 1) { + const current = new Date(now); + current.setDate(now.getDate() - offset); + const dayDir = nodePath.join( + sessionsRoot, + `${current.getFullYear()}`, + `${String(current.getMonth() + 1).padStart(2, "0")}`, + `${String(current.getDate()).padStart(2, "0")}`, + ); + const entries = await safeReadDir(dayDir); + for (const entry of entries) { + if (entry.isFile() && entry.name.endsWith(".jsonl")) { + candidates.push(nodePath.join(dayDir, entry.name)); + } + } + } + + return listRecentFiles(candidates); +} + +async function readCodexSessionSummary(path: string): Promise { + let fileContents: string; + try { + fileContents = await fs.readFile(path, "utf8"); + } catch { + return null; + } + + let latestSummary: CodexSessionSummary | null = null; + const lines = fileContents.split(/\r?\n/u); + for (const line of lines) { + if (!line.trim()) { + continue; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + continue; + } + + const record = asRecord(parsed); + if (!record || record.type !== "event_msg") { + continue; + } + + const payload = asRecord(record.payload); + if (!payload || payload.type !== "token_count") { + continue; + } + + const timestampMs = parseTimestampMs(record.timestamp ?? payload.timestamp); + if (timestampMs === null) { + continue; + } + + const summary = { + timestampMs, + totalTokens: readCodexTotalTokens(payload), + limits: normalizeCodexUsageLimits(payload.rate_limits ?? payload.rateLimits), + } satisfies CodexSessionSummary; + + if (!latestSummary || summary.timestampMs > latestSummary.timestampMs) { + latestSummary = summary; + } + } + + return latestSummary; +} + +function readClaudeTotalTokens(value: unknown): number { + const usage = asRecord(value); + if (!usage) { + return 0; + } + + const inputTokens = + (asNonNegativeNumber(usage.input_tokens) ?? 0) + + (asNonNegativeNumber(usage.cache_creation_input_tokens) ?? 0) + + (asNonNegativeNumber(usage.cache_read_input_tokens) ?? 0); + const outputTokens = asNonNegativeNumber(usage.output_tokens) ?? 0; + return asNonNegativeNumber(usage.total_tokens) ?? inputTokens + outputTokens; +} + +function readClaudeAssistantSample(input: { + record: Record; + fallbackKey: string; +}): { dedupeKey: string; sample: ClaudeUsageSample } | null { + if (input.record.type !== "assistant") { + return null; + } + + const message = asRecord(input.record.message); + const usage = asRecord(message?.usage); + const totalTokens = readClaudeTotalTokens(usage); + const timestampMs = parseTimestampMs(input.record.timestamp); + if (!usage || totalTokens <= 0 || timestampMs === null) { + return null; + } + + const sessionId = asString(input.record.sessionId) ?? input.fallbackKey; + const model = asString(message?.model) ?? null; + const dedupeKey = + `${sessionId}:assistant:` + + (asString(input.record.requestId) ?? + asString(message?.id) ?? + asString(input.record.uuid) ?? + input.fallbackKey); + + return { + dedupeKey, + sample: { + sessionId, + timestampMs, + totalTokens, + model, + }, + }; +} + +function readClaudeToolResultSample(input: { + record: Record; + fallbackKey: string; +}): { dedupeKey: string; sample: ClaudeUsageSample } | null { + const toolUseResult = asRecord(input.record.toolUseResult); + const usage = asRecord(toolUseResult?.usage); + const totalTokens = readClaudeTotalTokens(usage); + const timestampMs = parseTimestampMs(input.record.timestamp); + if (!toolUseResult || !usage || totalTokens <= 0 || timestampMs === null) { + return null; + } + + const sessionId = asString(input.record.sessionId) ?? input.fallbackKey; + const dedupeKey = + `${sessionId}:tool-result:` + + (asString(input.record.uuid) ?? + asString(toolUseResult.agentId) ?? + asString(input.record.requestId) ?? + input.fallbackKey); + + return { + dedupeKey, + sample: { + sessionId, + timestampMs, + totalTokens, + model: null, + }, + }; +} + +async function listRecentClaudeTranscriptFiles( + projectsRoot: string, +): Promise> { + const candidates: string[] = []; + const projectEntries = await safeReadDir(projectsRoot); + + for (const projectEntry of projectEntries) { + if (!projectEntry.isDirectory()) { + continue; + } + + const projectDir = nodePath.join(projectsRoot, projectEntry.name); + const transcriptEntries = await safeReadDir(projectDir); + for (const transcriptEntry of transcriptEntries) { + if (transcriptEntry.isFile() && transcriptEntry.name.endsWith(".jsonl")) { + candidates.push(nodePath.join(projectDir, transcriptEntry.name)); + } + } + } + + return listRecentFiles(candidates); +} + +async function readClaudeUsageSamples(path: string): Promise> { + let fileContents: string; + try { + fileContents = await fs.readFile(path, "utf8"); + } catch { + return []; + } + + const samples: ClaudeUsageSample[] = []; + const seenKeys = new Set(); + const lines = fileContents.split(/\r?\n/u); + + for (let index = 0; index < lines.length; index += 1) { + const line = lines[index]; + if (!line || !line.trim()) { + continue; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + continue; + } + + const record = asRecord(parsed); + if (!record) { + continue; + } + + const fallbackKey = `${path}:${index}`; + const assistantSample = readClaudeAssistantSample({ record, fallbackKey }); + if (assistantSample && !seenKeys.has(assistantSample.dedupeKey)) { + seenKeys.add(assistantSample.dedupeKey); + samples.push(assistantSample.sample); + } + + const toolResultSample = readClaudeToolResultSample({ record, fallbackKey }); + if (toolResultSample && !seenKeys.has(toolResultSample.dedupeKey)) { + seenKeys.add(toolResultSample.dedupeKey); + samples.push(toolResultSample.sample); + } + } + + return samples; +} + +async function loadCodexUsageSnapshot(input: { + homeDir: string; + homePath?: string; +}): Promise { + const codexHomeDir = + input.homePath?.trim() || process.env.CODEX_HOME || nodePath.join(input.homeDir, ".codex"); + const sessionsRoot = nodePath.join(codexHomeDir, "sessions"); + const sessionFiles = await listRecentCodexSessionFiles(sessionsRoot); + if (sessionFiles.length === 0) { + return null; + } + + const sessionSummaries: CodexSessionSummary[] = []; + for (const sessionFile of sessionFiles) { + const summary = await readCodexSessionSummary(sessionFile); + if (summary) { + sessionSummaries.push(summary); + } + } + + if (sessionSummaries.length === 0) { + return null; + } + + const latestSummary = sessionSummaries.reduce((latest, current) => + current.timestampMs > latest.timestampMs ? current : latest, + ); + const nowMs = Date.now(); + const cutoff24h = nowMs - ONE_DAY_MS; + const cutoff7d = nowMs - LOOKBACK_MS; + + const recent24h = sessionSummaries.filter((summary) => summary.timestampMs >= cutoff24h); + const recent7d = sessionSummaries.filter((summary) => summary.timestampMs >= cutoff7d); + + return { + provider: "codex", + updatedAt: toIsoString(latestSummary.timestampMs), + limits: latestSummary.limits, + usageLines: buildUsageLines({ + tokens24h: recent24h.reduce((total, summary) => total + summary.totalTokens, 0), + tokens7d: recent7d.reduce((total, summary) => total + summary.totalTokens, 0), + sessions24h: recent24h.length, + sessions7d: recent7d.length, + latestTimestampMs: latestSummary.timestampMs, + latestSubtitle: "local Codex archive", + }), + source: "codex-session-archive", + }; +} + +async function loadClaudeUsageSnapshot(input: { homeDir: string }): Promise { + const projectsRoot = nodePath.join(input.homeDir, ".claude", "projects"); + const transcriptFiles = await listRecentClaudeTranscriptFiles(projectsRoot); + if (transcriptFiles.length === 0) { + return null; + } + + const usageSamples: ClaudeUsageSample[] = []; + for (const transcriptFile of transcriptFiles) { + usageSamples.push(...(await readClaudeUsageSamples(transcriptFile))); + } + + if (usageSamples.length === 0) { + return null; + } + + const nowMs = Date.now(); + const cutoff24h = nowMs - ONE_DAY_MS; + const cutoff7d = nowMs - LOOKBACK_MS; + const recent24h = usageSamples.filter((sample) => sample.timestampMs >= cutoff24h); + const recent7d = usageSamples.filter((sample) => sample.timestampMs >= cutoff7d); + const latestSample = usageSamples.reduce((latest, current) => + current.timestampMs > latest.timestampMs ? current : latest, + ); + + return { + provider: "claudeAgent", + updatedAt: toIsoString(latestSample.timestampMs), + limits: [], + usageLines: buildUsageLines({ + tokens24h: recent24h.reduce((total, sample) => total + sample.totalTokens, 0), + tokens7d: recent7d.reduce((total, sample) => total + sample.totalTokens, 0), + sessions24h: new Set(recent24h.map((sample) => sample.sessionId)).size, + sessions7d: new Set(recent7d.map((sample) => sample.sessionId)).size, + latestTimestampMs: latestSample.timestampMs, + latestSubtitle: latestSample.model ?? "local Claude transcripts", + }), + source: "claude-project-transcripts", + }; +} + +async function loadProviderUsageSnapshot(input: { + provider: ProviderKind; + homeDir: string; + homePath?: string; +}): Promise { + switch (input.provider) { + case "codex": + return loadCodexUsageSnapshot({ + homeDir: input.homeDir, + ...(input.homePath ? { homePath: input.homePath } : {}), + }); + case "claudeAgent": + return loadClaudeUsageSnapshot({ homeDir: input.homeDir }); + case "gemini": + default: + return null; + } +} + +async function getCachedProviderUsageSnapshot(input: { + provider: ProviderKind; + homeDir: string; + homePath?: string; +}): Promise { + const cacheKey = `${input.provider}:${input.homeDir}:${input.homePath?.trim() ?? ""}`; + const nowMs = Date.now(); + const existing = usageSnapshotCache.get(cacheKey); + + if (existing && existing.expiresAtMs > nowMs) { + return existing.value; + } + if (existing?.pending) { + return existing.pending; + } + + const pending = loadProviderUsageSnapshot(input) + .catch(() => null) + .then((value) => { + usageSnapshotCache.set(cacheKey, { + expiresAtMs: Date.now() + USAGE_CACHE_TTL_MS, + value, + pending: null, + }); + return value; + }); + + usageSnapshotCache.set(cacheKey, { + expiresAtMs: existing?.expiresAtMs ?? 0, + value: existing?.value ?? null, + pending, + }); + + return pending; +} + +export const getProviderUsageSnapshot = Effect.fn(function* ( + input: ServerGetProviderUsageSnapshotInput, +) { + const serverConfig = yield* ServerConfig; + return yield* Effect.tryPromise({ + try: () => + getCachedProviderUsageSnapshot({ + provider: input.provider, + homeDir: serverConfig.homeDir, + ...(input.homePath ? { homePath: input.homePath } : {}), + }), + catch: () => null, + }); +}); diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index e09dad55..d03c6f0c 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -97,6 +97,8 @@ import { makeServerReadiness } from "./wsServer/readiness.ts"; import { decodeJsonResult, formatSchemaError } from "@t3tools/shared/schemaJson"; import { workspaceRootsEqual } from "@t3tools/shared/threadWorkspace"; import { TerminalThreadTitleTracker } from "./terminal/terminalThreadTitleTracker"; +import { importLegacyT3State } from "./legacyStateImport.ts"; +import { getProviderUsageSnapshot } from "./providerUsageSnapshot"; /** * ServerShape - Service API for server lifecycle control. @@ -176,6 +178,9 @@ function isThreadDetailEvent(event: OrchestrationEvent): event is Extract< { type: | "thread.message-sent" + | "thread.messages-imported" + | "thread.proposed-plans-imported" + | "thread.activities-imported" | "thread.proposed-plan-upserted" | "thread.activity-appended" | "thread.turn-diff-completed" @@ -185,6 +190,9 @@ function isThreadDetailEvent(event: OrchestrationEvent): event is Extract< > { return ( event.type === "thread.message-sent" || + event.type === "thread.messages-imported" || + event.type === "thread.proposed-plans-imported" || + event.type === "thread.activities-imported" || event.type === "thread.proposed-plan-upserted" || event.type === "thread.activity-appended" || event.type === "thread.turn-diff-completed" || @@ -1526,6 +1534,23 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< return { threadId: thread.id }; } + case ORCHESTRATION_WS_METHODS.importLegacyT3State: { + const body = stripRequestTag(request.body); + return yield* importLegacyT3State( + body.sourceBaseDir === undefined ? {} : { sourceBaseDir: body.sourceBaseDir }, + ).pipe( + Effect.mapError( + (cause) => + new RouteRequestError({ + message: + cause instanceof Error && cause.message.length > 0 + ? cause.message + : "Failed to import legacy T3 Code state.", + }), + ), + ); + } + case ORCHESTRATION_WS_METHODS.getTurnDiff: { const body = stripRequestTag(request.body); return yield* checkpointDiffQuery.getTurnDiff(body); @@ -1805,6 +1830,11 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< worktrees: yield* listManagedWorktrees(), }; + case WS_METHODS.serverGetProviderUsageSnapshot: { + const body = stripRequestTag(request.body); + return yield* getProviderUsageSnapshot(body); + } + case WS_METHODS.serverTranscribeVoice: { const body = stripRequestTag(request.body); const adapter = yield* providerAdapterRegistry.getByProvider(body.provider); diff --git a/apps/web/src/appSettings.ts b/apps/web/src/appSettings.ts index f1a9a5ff..3c731cc0 100644 --- a/apps/web/src/appSettings.ts +++ b/apps/web/src/appSettings.ts @@ -74,6 +74,7 @@ export const AppSettingsSchema = Schema.Struct({ enableAssistantStreaming: Schema.Boolean.pipe(withDefaults(() => false)), enableTaskCompletionToasts: Schema.Boolean.pipe(withDefaults(() => true)), enableSystemTaskCompletionNotifications: Schema.Boolean.pipe(withDefaults(() => true)), + showComposerUsageBadge: Schema.Boolean.pipe(withDefaults(() => false)), sidebarSide: SidebarSide.pipe(withDefaults(() => DEFAULT_SIDEBAR_SIDE)), sidebarProjectSortOrder: SidebarProjectSortOrder.pipe( withDefaults(() => DEFAULT_SIDEBAR_PROJECT_SORT_ORDER), diff --git a/apps/web/src/components/BranchToolbar.tsx b/apps/web/src/components/BranchToolbar.tsx index f460ff3e..bcab4621 100644 --- a/apps/web/src/components/BranchToolbar.tsx +++ b/apps/web/src/components/BranchToolbar.tsx @@ -4,15 +4,17 @@ import type { ThreadId, RuntimeMode } from "@t3tools/contracts"; import { deriveAssociatedWorktreeMetadata } from "@t3tools/shared/threadWorkspace"; import { LuSplit } from "react-icons/lu"; -import { ChevronDownIcon, ChevronRightIcon, ExternalLinkIcon, HandoffIcon } from "~/lib/icons"; +import { ChevronDownIcon, ChevronRightIcon, HandoffIcon } from "~/lib/icons"; import { FiThumbsUp } from "react-icons/fi"; import { HiOutlineHandRaised } from "react-icons/hi2"; import { PiLaptop } from "react-icons/pi"; import { useCallback, useMemo, useRef, useState } from "react"; +import { useAppSettings } from "~/appSettings"; import { newCommandId, cn } from "../lib/utils"; import { readNativeApi } from "../nativeApi"; import { useComposerDraftStore } from "../composerDraftStore"; +import { useProviderUsageSummary } from "../hooks/useProviderUsageSummary"; import { resolveThreadEnvironmentPresentation } from "../lib/threadEnvironment"; import { useStore } from "../store"; import { @@ -28,11 +30,10 @@ import { import { BranchToolbarBranchSelector } from "./BranchToolbarBranchSelector"; import { ContextWindowMeter } from "./chat/ContextWindowMeter"; import type { ContextWindowSnapshot } from "../lib/contextWindow"; +import { ProviderUsagePanelContent } from "./ProviderUsagePanelContent"; import { Popover, PopoverPopup, PopoverTrigger } from "./ui/popover"; import { Collapsible, CollapsiblePanel, CollapsibleTrigger } from "./ui/collapsible"; import type { ThreadWorkspacePatch } from "../types"; -import { deriveAccountRateLimits, deriveRateLimitLearnMoreHref } from "~/lib/rateLimits"; -import { RateLimitSummaryList } from "./RateLimitSummaryList"; function WorktreeGlyph({ className }: { className?: string }) { return ; @@ -77,6 +78,7 @@ export default function BranchToolbar({ const draftThread = useComposerDraftStore((store) => store.getDraftThread(threadId)); const setDraftThreadContext = useComposerDraftStore((store) => store.setDraftThreadContext); const threads = useStore(useRef(createAllThreadsSelector()).current); + const { settings } = useAppSettings(); const serverThread = useStore(useMemo(() => createThreadSelector(threadId), [threadId])); const activeProjectId = serverThread?.projectId ?? draftThread?.projectId ?? null; @@ -191,13 +193,11 @@ export default function BranchToolbar({ const canSwitchToLocal = Boolean(!envLocked && effectiveEnvMode === "worktree"); const showEnvPicker = effectiveEnvMode === "local" || canSwitchToLocal; - const rateLimits = useMemo(() => { - const derived = deriveAccountRateLimits(threads); - return activeProvider - ? derived.filter((rateLimit) => rateLimit.provider === activeProvider) - : derived; - }, [activeProvider, threads]); - const learnMoreHref = useMemo(() => deriveRateLimitLearnMoreHref(rateLimits), [rateLimits]); + const usageSummary = useProviderUsageSummary({ + provider: activeProvider, + threads, + codexHomePath: settings.codexHomePath || null, + }); const [rateLimitsOpen, setRateLimitsOpen] = useState(true); const [envPickerOpen, setEnvPickerOpen] = useState(false); @@ -347,20 +347,15 @@ export default function BranchToolbar({ /> -
- - {learnMoreHref ? ( - - Learn more - - - ) : null} -
+
diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index bc72eb9d..8734c6fb 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -95,7 +95,11 @@ import { replaceTextRange, stripComposerTriggerText, } from "../composer-logic"; -import { createProjectSelector, createThreadSelector } from "../storeSelectors"; +import { + createAllThreadsSelector, + createProjectSelector, + createThreadSelector, +} from "../storeSelectors"; import { canOfferForkSlashCommand, canOfferReviewSlashCommand, @@ -150,6 +154,7 @@ import { useTheme } from "../hooks/useTheme"; import { useThreadWorkspaceHandoff } from "../hooks/useThreadWorkspaceHandoff"; import { useComposerCommandMenuItems } from "../hooks/useComposerCommandMenuItems"; import { useThreadHandoff } from "../hooks/useThreadHandoff"; +import { useProviderUsageSummary } from "../hooks/useProviderUsageSummary"; import { useTurnDiffSummaries } from "../hooks/useTurnDiffSummaries"; import BranchToolbar from "./BranchToolbar"; import { ThreadWorktreeHandoffDialog } from "./ThreadWorktreeHandoffDialog"; @@ -255,6 +260,7 @@ import { ComposerExtrasMenu } from "./chat/ComposerExtrasMenu"; import { ComposerPendingApprovalPanel } from "./chat/ComposerPendingApprovalPanel"; import { ComposerPendingUserInputPanel } from "./chat/ComposerPendingUserInputPanel"; import { ComposerPlanFollowUpBanner } from "./chat/ComposerPlanFollowUpBanner"; +import { ComposerUsageBadge } from "./chat/ComposerUsageBadge"; import { ComposerVoiceButton } from "./chat/ComposerVoiceButton"; import { ComposerVoiceRecorderBar } from "./chat/ComposerVoiceRecorderBar"; import { ComposerReferenceAttachments } from "./chat/ComposerReferenceAttachments"; @@ -934,6 +940,7 @@ export default function ChatView({ [draftThread, fallbackDraftProject?.defaultModelSelection, localDraftError, threadId], ); const activeThread = serverThread ?? localDraftThread; + const allThreads = useStore(useMemo(() => createAllThreadsSelector(), [])); const runtimeMode = composerDraft.runtimeMode ?? activeThread?.runtimeMode ?? DEFAULT_RUNTIME_MODE; const interactionMode = @@ -5380,6 +5387,12 @@ export default function ChatView({ shortcutLabel: traitsPickerShortcutLabel, onPromptChange: setPromptFromTraits, }); + const showComposerUsageBadge = settings.showComposerUsageBadge; + const composerUsageSummary = useProviderUsageSummary({ + provider: showComposerUsageBadge ? selectedProvider : null, + threads: allThreads, + codexHomePath: settings.codexHomePath || null, + }); const toggleFastMode = useCallback(() => { if (!composerTraitSelection.caps.supportsFastMode) { scheduleComposerFocus(); @@ -6393,13 +6406,27 @@ export default function ChatView({ onProviderModelChange={onProviderModelSelect} /> - {providerTraitsPicker ? ( + {providerTraitsPicker || showComposerUsageBadge ? ( <> {providerTraitsPicker} + {providerTraitsPicker && showComposerUsageBadge ? ( + + ) : null} + {showComposerUsageBadge ? ( + + ) : null} ) : null} @@ -7110,13 +7137,27 @@ export default function ChatView({ onProviderModelChange={onProviderModelSelect} /> - {providerTraitsPicker ? ( + {providerTraitsPicker || showComposerUsageBadge ? ( <> {providerTraitsPicker} + {providerTraitsPicker && showComposerUsageBadge ? ( + + ) : null} + {showComposerUsageBadge ? ( + + ) : null} ) : null} diff --git a/apps/web/src/components/ProviderUsagePanelContent.tsx b/apps/web/src/components/ProviderUsagePanelContent.tsx new file mode 100644 index 00000000..7b5b7cdd --- /dev/null +++ b/apps/web/src/components/ProviderUsagePanelContent.tsx @@ -0,0 +1,90 @@ +import type { ProviderKind } from "@t3tools/contracts"; +import { memo, useMemo } from "react"; + +import { ExternalLinkIcon } from "~/lib/icons"; +import type { OpenUsageUsageLine } from "~/lib/openUsageRateLimits"; +import { + deriveProviderUsageLearnMoreHref, + deriveRateLimitLearnMoreHref, + deriveVisibleRateLimitRows, + type ProviderRateLimit, +} from "~/lib/rateLimits"; +import { cn } from "~/lib/utils"; + +import { RateLimitSummaryList } from "./RateLimitSummaryList"; + +export function providerUsageLabel(provider: ProviderKind | null | undefined): string { + if (provider === "codex") return "Codex usage"; + if (provider === "claudeAgent") return "Claude usage"; + if (provider === "gemini") return "Gemini usage"; + return "Usage"; +} + +export const ProviderUsagePanelContent = memo(function ProviderUsagePanelContent(props: { + provider: ProviderKind | null | undefined; + rateLimits: ReadonlyArray; + usageLines?: ReadonlyArray | undefined; + isLoading?: boolean | undefined; + learnMoreHref?: string | null | undefined; + showTitle?: boolean | undefined; + className?: string | undefined; +}) { + const visibleRows = useMemo( + () => deriveVisibleRateLimitRows(props.rateLimits), + [props.rateLimits], + ); + const learnMoreHref = useMemo( + () => + props.learnMoreHref ?? + deriveRateLimitLearnMoreHref(props.rateLimits) ?? + deriveProviderUsageLearnMoreHref(props.provider), + [props.learnMoreHref, props.provider, props.rateLimits], + ); + + return ( +
+ {props.showTitle !== false ? ( +
+ {providerUsageLabel(props.provider)} +
+ ) : null} + {visibleRows.length > 0 ? : null} + {props.usageLines && props.usageLines.length > 0 ? ( +
+ {props.usageLines.map((line) => ( +
+
+ {line.label} + {line.value} +
+ {line.subtitle ? ( +
{line.subtitle}
+ ) : null} +
+ ))} +
+ ) : visibleRows.length === 0 && props.isLoading ? ( +

+ Scanning local usage data for the selected provider. +

+ ) : visibleRows.length === 0 ? ( +

+ {props.provider + ? "No local usage data was found yet for the selected provider." + : "No local usage data was found yet."} +

+ ) : null} + {learnMoreHref ? ( + + Learn more + + + ) : null} +
+ ); +}); diff --git a/apps/web/src/components/Sidebar.tsx b/apps/web/src/components/Sidebar.tsx index 18251d95..95e1e406 100644 --- a/apps/web/src/components/Sidebar.tsx +++ b/apps/web/src/components/Sidebar.tsx @@ -113,6 +113,7 @@ import { RenameThreadDialog } from "./RenameThreadDialog"; import { terminalRuntimeRegistry } from "./terminal/terminalRuntimeRegistry"; import { SidebarSearchPalette } from "./SidebarSearchPalette"; import { useHandleNewChat } from "../hooks/useHandleNewChat"; +import { useLegacyT3Import } from "../hooks/useLegacyT3Import"; import { useHandleNewThread } from "../hooks/useHandleNewThread"; import { useThreadHandoff } from "../hooks/useThreadHandoff"; import { selectThreadTerminalState, useTerminalStateStore } from "../terminalStateStore"; @@ -1100,6 +1101,8 @@ export default function Sidebar() { select: (config) => config.keybindings, }); const queryClient = useQueryClient(); + const { defaultLegacyT3BaseDir, importLegacyT3State, isImportingLegacyT3State } = + useLegacyT3Import(); const removeWorktreeMutation = useMutation(gitRemoveWorktreeMutationOptions({ queryClient })); const { activeProjectId: focusedProjectId } = useFocusedChatContext(); const [addingProject, setAddingProject] = useState(false); @@ -1849,6 +1852,17 @@ export default function Sidebar() { setAddingProject((prev) => !prev); }, []); + const handleImportLegacyProjects = useCallback(() => { + void importLegacyT3State({ + confirmationLines: [ + "Import from T3 Code?", + `Source: ${defaultLegacyT3BaseDir}`, + "This imports legacy projects and chats into the current DP Code profile.", + "Existing DP Code chats stay in place and duplicate thread ids are skipped.", + ], + }); + }, [defaultLegacyT3BaseDir, importLegacyT3State]); + // Send the shortcut straight to the native picker when desktop APIs are available. const handleShortcutAddProject = useCallback(() => { if (isElectron) { @@ -5573,8 +5587,31 @@ export default function Sidebar() { )} {standardProjects.length === 0 && !shouldShowProjectPathEntry && ( -
- No projects yet +
+
+

+ No projects yet +

+

+ Add a folder manually or import your existing T3 Code projects and chats. +

+
+ + +
+

+ Source: {defaultLegacyT3BaseDir} +

+
)} diff --git a/apps/web/src/components/chat/ComposerUsageBadge.tsx b/apps/web/src/components/chat/ComposerUsageBadge.tsx new file mode 100644 index 00000000..7b529c3c --- /dev/null +++ b/apps/web/src/components/chat/ComposerUsageBadge.tsx @@ -0,0 +1,93 @@ +// FILE: ComposerUsageBadge.tsx +// Purpose: Show the selected provider's remaining usage inline in the composer footer. +// Layer: Chat composer presentation +// Depends on: shared rate-limit derivation, global thread store, and shared popover/button styling. + +import { type ProviderKind } from "@t3tools/contracts"; +import { memo, useMemo } from "react"; + +import { ChevronDownIcon } from "~/lib/icons"; +import type { OpenUsageUsageLine } from "~/lib/openUsageRateLimits"; +import { + deriveVisibleRateLimitRows, + formatRateLimitRemainingPercent, + type ProviderRateLimit, +} from "~/lib/rateLimits"; +import { cn } from "~/lib/utils"; + +import { ProviderUsagePanelContent, providerUsageLabel } from "../ProviderUsagePanelContent"; +import { Button } from "../ui/button"; +import { Popover, PopoverPopup, PopoverTrigger } from "../ui/popover"; +import { COMPOSER_PICKER_TRIGGER_TEXT_CLASS_NAME } from "./composerPickerStyles"; + +function buildPrimaryUsageLabel(input: { + readonly label: string; + readonly remainingPercent: number; +}): string { + const remaining = formatRateLimitRemainingPercent(input.remainingPercent); + if (input.label === "Current") { + return `${remaining} left`; + } + return `${input.label} ${remaining} left`; +} + +function buildUsageLineLabel(input: OpenUsageUsageLine): string { + return `${input.label} ${input.value}`; +} + +export const ComposerUsageBadge = memo(function ComposerUsageBadge(props: { + provider: ProviderKind; + rateLimits: ReadonlyArray; + usageLines?: ReadonlyArray; + isLoading?: boolean; +}) { + const visibleRows = useMemo( + () => deriveVisibleRateLimitRows(props.rateLimits), + [props.rateLimits], + ); + const primaryRow = visibleRows[0] ?? null; + const primaryUsageLine = props.usageLines?.[0] ?? null; + const inlineLabel = primaryRow + ? buildPrimaryUsageLabel(primaryRow) + : primaryUsageLine + ? buildUsageLineLabel(primaryUsageLine) + : props.isLoading + ? "Loading usage..." + : providerUsageLabel(props.provider); + + return ( + + + } + > + + {inlineLabel} + + + + + + + ); +}); diff --git a/apps/web/src/hooks/useLegacyT3Import.ts b/apps/web/src/hooks/useLegacyT3Import.ts new file mode 100644 index 00000000..c52b4a18 --- /dev/null +++ b/apps/web/src/hooks/useLegacyT3Import.ts @@ -0,0 +1,97 @@ +import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { useCallback, useMemo, useState } from "react"; + +import { toastManager } from "../components/ui/toast"; +import { serverConfigQueryOptions, serverQueryKeys } from "../lib/serverReactQuery"; +import { ensureNativeApi, readNativeApi } from "../nativeApi"; +import { useStore } from "../store"; + +export type LegacyT3ImportOutcome = "cancelled" | "completed" | "failed"; + +interface ImportLegacyT3StateOptions { + confirmationLines?: readonly string[]; + skipConfirmation?: boolean; +} + +function formatLegacyT3BaseDir(homeDir: string | null | undefined): string { + if (!homeDir) { + return "~/.t3 on macOS or %USERPROFILE%\\.t3 on Windows"; + } + + const separator = + homeDir.endsWith("\\") || homeDir.endsWith("/") ? "" : homeDir.includes("\\") ? "\\" : "/"; + + return `${homeDir}${separator}.t3`; +} + +export function useLegacyT3Import() { + const queryClient = useQueryClient(); + const syncServerReadModel = useStore((store) => store.syncServerReadModel); + const serverConfigQuery = useQuery(serverConfigQueryOptions()); + const [isImportingLegacyT3State, setIsImportingLegacyT3State] = useState(false); + + const defaultLegacyT3BaseDir = useMemo( + () => formatLegacyT3BaseDir(serverConfigQuery.data?.homeDir), + [serverConfigQuery.data?.homeDir], + ); + + const importLegacyT3State = useCallback( + async (options?: ImportLegacyT3StateOptions): Promise => { + if (isImportingLegacyT3State) { + return "cancelled"; + } + + const api = readNativeApi() ?? ensureNativeApi(); + if (!options?.skipConfirmation) { + const confirmed = await api.dialogs.confirm( + ( + options?.confirmationLines ?? [ + "Import legacy T3 Code data?", + `Source: ${defaultLegacyT3BaseDir}`, + "This merges legacy projects and chats into the current DP Code profile.", + "Existing DP Code chats stay in place and duplicate thread ids are skipped.", + ] + ).join("\n"), + ); + if (!confirmed) { + return "cancelled"; + } + } + + setIsImportingLegacyT3State(true); + try { + const result = await api.orchestration.importLegacyT3State({}); + const snapshot = await api.orchestration.getSnapshot(); + syncServerReadModel(snapshot); + await queryClient.invalidateQueries({ queryKey: serverQueryKeys.all }); + toastManager.add({ + type: "success", + title: "Legacy T3 data imported", + description: [ + `${result.importedProjects} projects imported, ${result.mappedProjects} matched`, + `${result.importedThreads} threads imported, ${result.skippedThreads} skipped`, + `${result.copiedAttachments} attachments copied`, + ].join(" | "), + }); + return "completed"; + } catch (error) { + toastManager.add({ + type: "error", + title: "Legacy import failed", + description: + error instanceof Error ? error.message : "Unable to import T3 Code projects and chats.", + }); + return "failed"; + } finally { + setIsImportingLegacyT3State(false); + } + }, + [defaultLegacyT3BaseDir, isImportingLegacyT3State, queryClient, syncServerReadModel], + ); + + return { + defaultLegacyT3BaseDir, + importLegacyT3State, + isImportingLegacyT3State, + }; +} diff --git a/apps/web/src/hooks/useProviderUsageSummary.ts b/apps/web/src/hooks/useProviderUsageSummary.ts new file mode 100644 index 00000000..96778773 --- /dev/null +++ b/apps/web/src/hooks/useProviderUsageSummary.ts @@ -0,0 +1,83 @@ +import type { OrchestrationThread, ProviderKind } from "@t3tools/contracts"; +import { useQuery } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { + normalizeOpenUsageSnapshot, + normalizeOpenUsageUsageLines, +} from "~/lib/openUsageRateLimits"; +import { openUsageProviderSnapshotQueryOptions } from "~/lib/openUsageReactQuery"; +import { + normalizeServerProviderUsageLines, + normalizeServerProviderUsageRateLimit, +} from "~/lib/providerUsageSnapshot"; +import { + deriveProviderUsageLearnMoreHref, + deriveRateLimitLearnMoreHref, + deriveAccountRateLimits, + mergeProviderRateLimits, + type ProviderRateLimit, +} from "~/lib/rateLimits"; +import { serverProviderUsageSnapshotQueryOptions } from "~/lib/serverReactQuery"; + +export function useProviderUsageSummary(input: { + provider: ProviderKind | null | undefined; + threads: ReadonlyArray>; + codexHomePath?: string | null; +}) { + const providerUsageSnapshotQuery = useQuery( + serverProviderUsageSnapshotQueryOptions({ + provider: input.provider, + homePath: input.provider === "codex" ? input.codexHomePath || null : null, + }), + ); + const openUsageSnapshotQuery = useQuery(openUsageProviderSnapshotQueryOptions(input.provider)); + + const rateLimits = useMemo>(() => { + const derivedRateLimits = deriveAccountRateLimits(input.threads).filter((rateLimit) => + input.provider ? rateLimit.provider === input.provider : true, + ); + const serverUsageRateLimit = normalizeServerProviderUsageRateLimit( + providerUsageSnapshotQuery.data, + ); + const openUsageSnapshot = normalizeOpenUsageSnapshot( + openUsageSnapshotQuery.data, + input.provider, + ); + return mergeProviderRateLimits( + derivedRateLimits, + mergeProviderRateLimits( + serverUsageRateLimit ? [serverUsageRateLimit] : [], + openUsageSnapshot ? [openUsageSnapshot] : [], + ), + ); + }, [input.provider, input.threads, openUsageSnapshotQuery.data, providerUsageSnapshotQuery.data]); + + const usageLines = useMemo(() => { + const serverUsageLines = normalizeServerProviderUsageLines(providerUsageSnapshotQuery.data); + if (serverUsageLines.length > 0) { + return serverUsageLines; + } + return normalizeOpenUsageUsageLines(openUsageSnapshotQuery.data); + }, [openUsageSnapshotQuery.data, providerUsageSnapshotQuery.data]); + + const learnMoreHref = useMemo( + () => + deriveRateLimitLearnMoreHref(rateLimits) ?? deriveProviderUsageLearnMoreHref(input.provider), + [input.provider, rateLimits], + ); + + const isLoading = + input.provider !== null && + input.provider !== undefined && + providerUsageSnapshotQuery.isPending && + rateLimits.length === 0 && + usageLines.length === 0; + + return { + isLoading, + learnMoreHref, + rateLimits, + usageLines, + } as const; +} diff --git a/apps/web/src/lib/providerUsageSnapshot.ts b/apps/web/src/lib/providerUsageSnapshot.ts new file mode 100644 index 00000000..d4d68132 --- /dev/null +++ b/apps/web/src/lib/providerUsageSnapshot.ts @@ -0,0 +1,39 @@ +import type { ServerGetProviderUsageSnapshotResult } from "@t3tools/contracts"; + +import type { OpenUsageUsageLine } from "./openUsageRateLimits"; +import type { ProviderRateLimit } from "./rateLimits"; + +export function normalizeServerProviderUsageRateLimit( + snapshot: ServerGetProviderUsageSnapshotResult | null | undefined, +): ProviderRateLimit | null { + if (!snapshot || snapshot.limits.length === 0) { + return null; + } + + return { + provider: snapshot.provider, + updatedAt: snapshot.updatedAt, + limits: snapshot.limits.map((limit) => ({ + window: limit.window, + ...(limit.usedPercent !== undefined ? { usedPercent: limit.usedPercent } : {}), + ...(limit.resetsAt ? { resetsAt: limit.resetsAt } : {}), + ...(limit.windowDurationMins !== undefined + ? { windowDurationMins: limit.windowDurationMins } + : {}), + })), + }; +} + +export function normalizeServerProviderUsageLines( + snapshot: ServerGetProviderUsageSnapshotResult | null | undefined, +): OpenUsageUsageLine[] { + if (!snapshot || snapshot.usageLines.length === 0) { + return []; + } + + return snapshot.usageLines.map((line) => ({ + label: line.label, + value: line.value, + ...(line.subtitle ? { subtitle: line.subtitle } : {}), + })); +} diff --git a/apps/web/src/lib/rateLimits.ts b/apps/web/src/lib/rateLimits.ts index 01f9f674..505b701c 100644 --- a/apps/web/src/lib/rateLimits.ts +++ b/apps/web/src/lib/rateLimits.ts @@ -380,6 +380,12 @@ export function deriveRateLimitLearnMoreHref( if (providers.size !== 1) return null; const [provider] = providers; + return deriveProviderUsageLearnMoreHref(provider); +} + +export function deriveProviderUsageLearnMoreHref( + provider: string | null | undefined, +): string | null { if (provider === "codex") return "https://platform.openai.com/usage"; if (provider === "claudeAgent") { return "https://docs.anthropic.com/en/docs/about-claude/models#rate-limits"; diff --git a/apps/web/src/lib/serverReactQuery.ts b/apps/web/src/lib/serverReactQuery.ts index 4a6970ed..3cc57cc4 100644 --- a/apps/web/src/lib/serverReactQuery.ts +++ b/apps/web/src/lib/serverReactQuery.ts @@ -1,3 +1,4 @@ +import type { ProviderKind } from "@t3tools/contracts"; import { queryOptions } from "@tanstack/react-query"; import { ensureNativeApi } from "~/nativeApi"; @@ -5,6 +6,8 @@ export const serverQueryKeys = { all: ["server"] as const, config: () => ["server", "config"] as const, worktrees: () => ["server", "worktrees"] as const, + providerUsage: (provider: ProviderKind | null | undefined, homePath?: string | null) => + ["server", "providerUsage", provider ?? null, homePath ?? null] as const, }; export function serverConfigQueryOptions() { @@ -30,3 +33,25 @@ export function serverWorktreesQueryOptions() { refetchOnReconnect: true, }); } + +export function serverProviderUsageSnapshotQueryOptions(input: { + provider: ProviderKind | null | undefined; + homePath?: string | null; +}) { + return queryOptions({ + queryKey: serverQueryKeys.providerUsage(input.provider, input.homePath), + enabled: input.provider !== null && input.provider !== undefined, + staleTime: 30_000, + refetchInterval: 30_000, + refetchOnWindowFocus: false, + retry: false, + queryFn: async () => { + if (!input.provider) return null; + const api = ensureNativeApi(); + return api.server.getProviderUsageSnapshot({ + provider: input.provider, + ...(input.homePath ? { homePath: input.homePath } : {}), + }); + }, + }); +} diff --git a/apps/web/src/routes/_chat.settings.tsx b/apps/web/src/routes/_chat.settings.tsx index 169ae78d..27669fd4 100644 --- a/apps/web/src/routes/_chat.settings.tsx +++ b/apps/web/src/routes/_chat.settings.tsx @@ -42,6 +42,7 @@ import { SidebarHeaderTrigger, SidebarInset } from "../components/ui/sidebar"; import { Tooltip, TooltipPopup, TooltipTrigger } from "../components/ui/tooltip"; import { resolveAndPersistPreferredEditor } from "../editorPreferences"; import { isElectron } from "../env"; +import { useLegacyT3Import } from "../hooks/useLegacyT3Import"; import { useTheme } from "../hooks/useTheme"; import { gitRemoveWorktreeMutationOptions } from "../lib/gitReactQuery"; import { @@ -271,6 +272,8 @@ function SettingsRouteView() { const serverWorktreesQuery = useQuery(serverWorktreesQueryOptions()); const removeWorktreeMutation = useMutation(gitRemoveWorktreeMutationOptions({ queryClient })); const syncServerReadModel = useStore((store) => store.syncServerReadModel); + const { defaultLegacyT3BaseDir, importLegacyT3State, isImportingLegacyT3State } = + useLegacyT3Import(); const threads = useStore(useMemo(() => createAllThreadsSelector(), [])); const projects = useStore((store) => store.projects); const threadsHydrated = useStore((store) => store.threadsHydrated); @@ -628,7 +631,7 @@ function SettingsRouteView() { const confirmed = await api.dialogs.confirm( [ "Repair local state?", - "This rebuilds local project indexes and refreshes project snapshots.", + "This rebuilds local project indexes, refreshes project snapshots, and consolidates duplicate inherited projects.", "It keeps existing chats in place, but it may take a moment.", ].join("\n"), ); @@ -643,7 +646,8 @@ function SettingsRouteView() { toastManager.add({ type: "success", title: "Local state repaired", - description: "Project indexes were rebuilt without clearing existing chats.", + description: + "Project indexes were rebuilt and duplicate local projects were consolidated without clearing existing chats.", }); } catch (error) { toastManager.add({ @@ -926,6 +930,34 @@ function SettingsRouteView() { } /> + + + updateSettings({ + showComposerUsageBadge: defaults.showComposerUsageBadge, + }) + } + /> + ) : null + } + control={ + + updateSettings({ + showComposerUsageBadge: Boolean(checked), + }) + } + aria-label="Show composer usage badge" + /> + } + />
@@ -2018,6 +2050,31 @@ function SettingsRouteView() { } /> + + + {defaultLegacyT3BaseDir} + + + Looks for `userdata/state.sqlite` first and falls back to `dev/state.sqlite`. + + + } + control={ + + } + /> + ; +type ThreadMessagesImportedEvent = Extract< + OrchestrationEvent, + { type: "thread.messages-imported" } +>; +type ThreadActivitiesImportedEvent = Extract< + OrchestrationEvent, + { type: "thread.activities-imported" } +>; +type ThreadProposedPlansImportedEvent = Extract< + OrchestrationEvent, + { type: "thread.proposed-plans-imported" } +>; const PERSISTED_STATE_KEY = "t3code:renderer-state:v8"; const LEGACY_PERSISTED_STATE_KEYS = [ @@ -2542,6 +2554,106 @@ function applyThreadMessageSentEvent(thread: Thread, event: ThreadMessageSentEve }; } +function applyThreadMessagesImportedEvent( + thread: Thread, + event: ThreadMessagesImportedEvent, +): Thread { + return event.payload.messages.reduce( + (nextThread, message) => + applyThreadMessageSentEvent(nextThread, { + ...event, + type: "thread.message-sent", + payload: { + threadId: event.payload.threadId, + messageId: message.messageId, + role: message.role, + text: message.text, + ...(message.attachments !== undefined ? { attachments: message.attachments } : {}), + ...(message.skills !== undefined ? { skills: message.skills } : {}), + ...(message.mentions !== undefined ? { mentions: message.mentions } : {}), + ...(message.dispatchMode !== undefined ? { dispatchMode: message.dispatchMode } : {}), + turnId: message.turnId ?? null, + streaming: message.streaming ?? false, + source: message.source ?? "native", + createdAt: message.createdAt, + updatedAt: message.updatedAt, + }, + }), + thread, + ); +} + +function applyThreadActivitiesImportedEvent( + thread: Thread, + event: ThreadActivitiesImportedEvent, +): Thread { + let nextActivities = thread.activities; + let nextUpdatedAt = thread.updatedAt; + let changed = false; + for (const activity of event.payload.activities) { + const normalizedActivities = normalizeActivities([...nextActivities, activity], nextActivities); + const normalizedUpdatedAt = + (nextUpdatedAt ?? thread.createdAt) > activity.createdAt ? nextUpdatedAt : activity.createdAt; + if (normalizedActivities !== nextActivities) { + nextActivities = normalizedActivities; + changed = true; + } + if (normalizedUpdatedAt !== nextUpdatedAt) { + nextUpdatedAt = normalizedUpdatedAt; + changed = true; + } + } + return changed + ? { + ...thread, + activities: nextActivities, + updatedAt: nextUpdatedAt, + } + : thread; +} + +function applyThreadProposedPlansImportedEvent( + thread: Thread, + event: ThreadProposedPlansImportedEvent, +): Thread { + let nextProposedPlans = thread.proposedPlans; + let nextUpdatedAt = thread.updatedAt; + let changed = false; + for (const proposedPlan of event.payload.proposedPlans) { + const previousPlanIndex = nextProposedPlans.findIndex((plan) => plan.id === proposedPlan.id); + const nextPlan = normalizeProposedPlans( + [proposedPlan], + previousPlanIndex >= 0 ? [nextProposedPlans[previousPlanIndex]!] : undefined, + )[0]; + if (!nextPlan) { + continue; + } + const normalizedProposedPlans = + previousPlanIndex >= 0 + ? nextProposedPlans.map((plan, index) => (index === previousPlanIndex ? nextPlan : plan)) + : [...nextProposedPlans, nextPlan]; + const normalizedUpdatedAt = + (nextUpdatedAt ?? thread.createdAt) > proposedPlan.updatedAt + ? nextUpdatedAt + : proposedPlan.updatedAt; + if (!arraysShallowEqual(nextProposedPlans, normalizedProposedPlans)) { + nextProposedPlans = normalizedProposedPlans; + changed = true; + } + if (normalizedUpdatedAt !== nextUpdatedAt) { + nextUpdatedAt = normalizedUpdatedAt; + changed = true; + } + } + return changed + ? { + ...thread, + proposedPlans: nextProposedPlans, + updatedAt: nextUpdatedAt, + } + : thread; +} + function applyOrchestrationEvent( state: AppState, event: OrchestrationEvent, @@ -2626,6 +2738,8 @@ function applyOrchestrationEvent( const cwdChanged = thread.worktreePath !== nextWorktreePath; if ( + (event.payload.projectId === undefined || + event.payload.projectId === thread.projectId) && (event.payload.title === undefined || event.payload.title === thread.title) && modelSelection === thread.modelSelection && (event.payload.envMode === undefined || event.payload.envMode === thread.envMode) && @@ -2659,6 +2773,9 @@ function applyOrchestrationEvent( return { ...thread, + ...(event.payload.projectId !== undefined + ? { projectId: event.payload.projectId } + : {}), ...(event.payload.title !== undefined ? { title: event.payload.title } : {}), modelSelection, ...(event.payload.envMode !== undefined ? { envMode: event.payload.envMode } : {}), @@ -2704,6 +2821,14 @@ function applyOrchestrationEvent( options, ); + case "thread.messages-imported": + return applyThreadUpdate( + state, + event.payload.threadId, + (thread) => applyThreadMessagesImportedEvent(thread, event), + options, + ); + case "thread.session-set": return applyThreadUpdate( state, @@ -2864,6 +2989,14 @@ function applyOrchestrationEvent( options, ); + case "thread.activities-imported": + return applyThreadUpdate( + state, + event.payload.threadId, + (thread) => applyThreadActivitiesImportedEvent(thread, event), + options, + ); + case "thread.proposed-plan-upserted": return applyThreadUpdate( state, @@ -2900,6 +3033,14 @@ function applyOrchestrationEvent( options, ); + case "thread.proposed-plans-imported": + return applyThreadUpdate( + state, + event.payload.threadId, + (thread) => applyThreadProposedPlansImportedEvent(thread, event), + options, + ); + case "thread.turn-diff-completed": return applyThreadUpdate( state, diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index 6da196fb..3e0e1403 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -25,6 +25,7 @@ const showContextMenuFallbackMock = position?: { x: number; y: number }, ) => Promise >(); +const showConfirmDialogFallbackMock = vi.fn<(message: string) => Promise>(); const channelListeners = new Map void>>(); const latestPushByChannel = new Map(); const subscribeMock = vi.fn< @@ -65,6 +66,10 @@ vi.mock("./contextMenuFallback", () => ({ showContextMenuFallback: showContextMenuFallbackMock, })); +vi.mock("./confirmDialogFallback", () => ({ + showConfirmDialogFallback: showConfirmDialogFallbackMock, +})); + let nextPushSequence = 1; function emitPush(channel: C, data: WsPushData): void { @@ -106,6 +111,7 @@ beforeEach(() => { vi.resetModules(); requestMock.mockReset(); showContextMenuFallbackMock.mockReset(); + showConfirmDialogFallbackMock.mockReset(); subscribeMock.mockClear(); channelListeners.clear(); latestPushByChannel.clear(); @@ -458,6 +464,37 @@ describe("wsNativeApi", () => { ); }); + it("uses the desktop confirm bridge when available", async () => { + const confirm = vi.fn().mockResolvedValue(true); + Object.defineProperty(getWindowForTest(), "desktopBridge", { + configurable: true, + writable: true, + value: { + confirm, + }, + }); + + const { createWsNativeApi } = await import("./wsNativeApi"); + const api = createWsNativeApi(); + const result = await api.dialogs.confirm("Import from T3 Code?"); + + expect(result).toBe(true); + expect(confirm).toHaveBeenCalledWith("Import from T3 Code?"); + expect(showConfirmDialogFallbackMock).not.toHaveBeenCalled(); + }); + + it("uses fallback confirm when desktop bridge is unavailable", async () => { + showConfirmDialogFallbackMock.mockResolvedValue(false); + Reflect.deleteProperty(getWindowForTest(), "desktopBridge"); + + const { createWsNativeApi } = await import("./wsNativeApi"); + const api = createWsNativeApi(); + const result = await api.dialogs.confirm("Import from T3 Code?"); + + expect(result).toBe(false); + expect(showConfirmDialogFallbackMock).toHaveBeenCalledWith("Import from T3 Code?"); + }); + it("uses the desktop voice bridge when available", async () => { const transcribeVoice = vi.fn().mockResolvedValue({ text: "hello" }); Object.defineProperty(getWindowForTest(), "desktopBridge", { diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index a97da17b..592bb393 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -285,6 +285,9 @@ export function createWsNativeApi(): NativeApi { return window.desktopBridge.pickFolder(); }, confirm: async (message) => { + if (window.desktopBridge?.confirm) { + return window.desktopBridge.confirm(message); + } return showConfirmDialogFallback(message); }, }, @@ -372,6 +375,8 @@ export function createWsNativeApi(): NativeApi { getConfig: () => transport.request(WS_METHODS.serverGetConfig), refreshProviders: () => transport.request(WS_METHODS.serverRefreshProviders), listWorktrees: () => transport.request(WS_METHODS.serverListWorktrees), + getProviderUsageSnapshot: (input) => + transport.request(WS_METHODS.serverGetProviderUsageSnapshot, input), transcribeVoice: (input) => { if (window.desktopBridge?.server?.transcribeVoice) { return window.desktopBridge.server.transcribeVoice(input); @@ -396,6 +401,10 @@ export function createWsNativeApi(): NativeApi { dispatchCommand: (command) => transport.request(ORCHESTRATION_WS_METHODS.dispatchCommand, { command }), importThread: (input) => transport.request(ORCHESTRATION_WS_METHODS.importThread, input), + importLegacyT3State: (input) => + transport.request(ORCHESTRATION_WS_METHODS.importLegacyT3State, input, { + timeoutMs: null, + }), repairState: () => transport.request(ORCHESTRATION_WS_METHODS.repairState), getTurnDiff: (input) => transport.request(ORCHESTRATION_WS_METHODS.getTurnDiff, input), getFullThreadDiff: (input) => diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index 9334a4cb..a8699404 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -37,6 +37,8 @@ import type { } from "./project"; import type { ServerConfig, + ServerGetProviderUsageSnapshotInput, + ServerGetProviderUsageSnapshotResult, ServerListWorktreesResult, ServerRefreshProvidersResult, ServerUpsertKeybindingInput, @@ -60,6 +62,8 @@ import type { OrchestrationGetFullThreadDiffResult, OrchestrationImportThreadInput, OrchestrationImportThreadResult, + OrchestrationImportLegacyT3StateInput, + OrchestrationImportLegacyT3StateResult, OrchestrationGetTurnDiffInput, OrchestrationGetTurnDiffResult, OrchestrationEvent, @@ -308,6 +312,9 @@ export interface NativeApi { getConfig: () => Promise; refreshProviders: () => Promise; listWorktrees: () => Promise; + getProviderUsageSnapshot: ( + input: ServerGetProviderUsageSnapshotInput, + ) => Promise; transcribeVoice: ( input: ServerVoiceTranscriptionInput, ) => Promise; @@ -331,6 +338,9 @@ export interface NativeApi { importThread: ( input: OrchestrationImportThreadInput, ) => Promise; + importLegacyT3State: ( + input: OrchestrationImportLegacyT3StateInput, + ) => Promise; repairState: () => Promise; getTurnDiff: (input: OrchestrationGetTurnDiffInput) => Promise; getFullThreadDiff: ( diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index cb2aea7c..61d8a3fe 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -22,6 +22,7 @@ export const ORCHESTRATION_WS_METHODS = { getSnapshot: "orchestration.getSnapshot", dispatchCommand: "orchestration.dispatchCommand", importThread: "orchestration.importThread", + importLegacyT3State: "orchestration.importLegacyT3State", repairState: "orchestration.repairState", getTurnDiff: "orchestration.getTurnDiff", getFullThreadDiff: "orchestration.getFullThreadDiff", @@ -628,6 +629,24 @@ export const ThreadHandoffImportedMessage = Schema.Struct({ }); export type ThreadHandoffImportedMessage = typeof ThreadHandoffImportedMessage.Type; +export const ImportedThreadMessage = Schema.Struct({ + messageId: MessageId, + role: OrchestrationMessageRole, + text: Schema.String, + attachments: Schema.optional(Schema.Array(ChatAttachment)), + skills: Schema.optional(Schema.Array(ProviderSkillReference)), + mentions: Schema.optional(Schema.Array(ProviderMentionReference)), + dispatchMode: Schema.optional(TurnDispatchMode), + turnId: Schema.optional(Schema.NullOr(TurnId)).pipe(Schema.withDecodingDefault(() => null)), + streaming: Schema.optional(Schema.Boolean).pipe(Schema.withDecodingDefault(() => false)), + source: Schema.optional(OrchestrationMessageSource).pipe( + Schema.withDecodingDefault(() => "native"), + ), + createdAt: IsoDateTime, + updatedAt: IsoDateTime, +}); +export type ImportedThreadMessage = typeof ImportedThreadMessage.Type; + const ThreadHandoffCreateCommand = Schema.Struct({ type: Schema.Literal("thread.handoff.create"), commandId: CommandId, @@ -694,6 +713,7 @@ const ThreadMetaUpdateCommand = Schema.Struct({ type: Schema.Literal("thread.meta.update"), commandId: CommandId, threadId: ThreadId, + projectId: Schema.optional(ProjectId), title: Schema.optional(TrimmedNonEmptyString), modelSelection: Schema.optional(ModelSelection), envMode: Schema.optional(ThreadEnvironmentMode), @@ -896,7 +916,23 @@ const ThreadMessagesImportCommand = Schema.Struct({ type: Schema.Literal("thread.messages.import"), commandId: CommandId, threadId: ThreadId, - messages: Schema.Array(ThreadHandoffImportedMessage), + messages: Schema.Array(ImportedThreadMessage), + createdAt: IsoDateTime, +}); + +const ThreadProposedPlansImportCommand = Schema.Struct({ + type: Schema.Literal("thread.proposed-plans.import"), + commandId: CommandId, + threadId: ThreadId, + proposedPlans: Schema.Array(OrchestrationProposedPlan), + createdAt: IsoDateTime, +}); + +const ThreadActivitiesImportCommand = Schema.Struct({ + type: Schema.Literal("thread.activities.import"), + commandId: CommandId, + threadId: ThreadId, + activities: Schema.Array(OrchestrationThreadActivity), createdAt: IsoDateTime, }); @@ -960,6 +996,8 @@ const ThreadRevertCompleteCommand = Schema.Struct({ const InternalOrchestrationCommand = Schema.Union([ ThreadSessionSetCommand, ThreadMessagesImportCommand, + ThreadProposedPlansImportCommand, + ThreadActivitiesImportCommand, ThreadMessageAssistantDeltaCommand, ThreadMessageAssistantCompleteCommand, ThreadProposedPlanUpsertCommand, @@ -998,6 +1036,9 @@ export const OrchestrationEventType = Schema.Literals([ "thread.reverted", "thread.session-stop-requested", "thread.session-set", + "thread.messages-imported", + "thread.proposed-plans-imported", + "thread.activities-imported", "thread.proposed-plan-upserted", "thread.turn-diff-completed", "thread.activity-appended", @@ -1100,6 +1141,7 @@ export const ThreadUnarchivedPayload = Schema.Struct({ export const ThreadMetaUpdatedPayload = Schema.Struct({ threadId: ThreadId, + projectId: Schema.optional(ProjectId), title: Schema.optional(TrimmedNonEmptyString), modelSelection: Schema.optional(ModelSelection), envMode: Schema.optional(ThreadEnvironmentMode), @@ -1206,6 +1248,21 @@ export const ThreadSessionSetPayload = Schema.Struct({ session: OrchestrationSession, }); +export const ThreadMessagesImportedPayload = Schema.Struct({ + threadId: ThreadId, + messages: Schema.Array(ImportedThreadMessage), +}); + +export const ThreadProposedPlansImportedPayload = Schema.Struct({ + threadId: ThreadId, + proposedPlans: Schema.Array(OrchestrationProposedPlan), +}); + +export const ThreadActivitiesImportedPayload = Schema.Struct({ + threadId: ThreadId, + activities: Schema.Array(OrchestrationThreadActivity), +}); + export const ThreadProposedPlanUpsertedPayload = Schema.Struct({ threadId: ThreadId, proposedPlan: OrchestrationProposedPlan, @@ -1349,6 +1406,21 @@ export const OrchestrationEvent = Schema.Union([ type: Schema.Literal("thread.session-set"), payload: ThreadSessionSetPayload, }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.messages-imported"), + payload: ThreadMessagesImportedPayload, + }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.proposed-plans-imported"), + payload: ThreadProposedPlansImportedPayload, + }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.activities-imported"), + payload: ThreadActivitiesImportedPayload, + }), Schema.Struct({ ...EventBaseFields, type: Schema.Literal("thread.proposed-plan-upserted"), @@ -1507,6 +1579,31 @@ export const OrchestrationImportThreadResult = Schema.Struct({ }); export type OrchestrationImportThreadResult = typeof OrchestrationImportThreadResult.Type; +export const OrchestrationImportLegacyT3StateInput = Schema.Struct({ + sourceBaseDir: Schema.optional(TrimmedNonEmptyString), +}); +export type OrchestrationImportLegacyT3StateInput = + typeof OrchestrationImportLegacyT3StateInput.Type; + +export const OrchestrationImportLegacyT3StateResult = Schema.Struct({ + sourceBaseDir: TrimmedNonEmptyString, + sourceStateDir: TrimmedNonEmptyString, + importedProjects: NonNegativeInt, + mappedProjects: NonNegativeInt, + skippedProjects: NonNegativeInt, + importedThreads: NonNegativeInt, + skippedThreads: NonNegativeInt, + importedMessages: NonNegativeInt, + importedActivities: NonNegativeInt, + importedProposedPlans: NonNegativeInt, + importedCheckpoints: NonNegativeInt, + copiedAttachments: NonNegativeInt, + skippedAttachments: NonNegativeInt, + missingAttachments: NonNegativeInt, +}); +export type OrchestrationImportLegacyT3StateResult = + typeof OrchestrationImportLegacyT3StateResult.Type; + export const OrchestrationUnsubscribeThreadInput = Schema.Struct({ threadId: ThreadId, }); @@ -1529,6 +1626,10 @@ export const OrchestrationRpcSchemas = { input: OrchestrationImportThreadInput, output: OrchestrationImportThreadResult, }, + importLegacyT3State: { + input: OrchestrationImportLegacyT3StateInput, + output: OrchestrationImportLegacyT3StateResult, + }, getTurnDiff: { input: OrchestrationGetTurnDiffInput, output: OrchestrationGetTurnDiffResult, diff --git a/packages/contracts/src/server.ts b/packages/contracts/src/server.ts index 3653dc5b..ae2b3393 100644 --- a/packages/contracts/src/server.ts +++ b/packages/contracts/src/server.ts @@ -71,6 +71,41 @@ export const ServerListWorktreesResult = Schema.Struct({ }); export type ServerListWorktreesResult = typeof ServerListWorktreesResult.Type; +export const ServerProviderUsageLimit = Schema.Struct({ + window: TrimmedNonEmptyString, + usedPercent: Schema.optional( + Schema.Number.check(Schema.isGreaterThanOrEqualTo(0)).check(Schema.isLessThanOrEqualTo(100)), + ), + resetsAt: Schema.optional(IsoDateTime), + windowDurationMins: Schema.optional(NonNegativeInt), +}); +export type ServerProviderUsageLimit = typeof ServerProviderUsageLimit.Type; + +export const ServerProviderUsageLine = Schema.Struct({ + label: TrimmedNonEmptyString, + value: TrimmedNonEmptyString, + subtitle: Schema.optional(TrimmedNonEmptyString), +}); +export type ServerProviderUsageLine = typeof ServerProviderUsageLine.Type; + +export const ServerProviderUsageSnapshot = Schema.Struct({ + provider: ProviderKind, + updatedAt: IsoDateTime, + limits: Schema.Array(ServerProviderUsageLimit), + usageLines: Schema.Array(ServerProviderUsageLine), + source: TrimmedNonEmptyString, +}); +export type ServerProviderUsageSnapshot = typeof ServerProviderUsageSnapshot.Type; + +export const ServerGetProviderUsageSnapshotInput = Schema.Struct({ + provider: ProviderKind, + homePath: Schema.optional(TrimmedNonEmptyString), +}); +export type ServerGetProviderUsageSnapshotInput = typeof ServerGetProviderUsageSnapshotInput.Type; + +export const ServerGetProviderUsageSnapshotResult = Schema.NullOr(ServerProviderUsageSnapshot); +export type ServerGetProviderUsageSnapshotResult = typeof ServerGetProviderUsageSnapshotResult.Type; + export const ServerVoiceTranscriptionInput = Schema.Struct({ provider: ProviderKind, cwd: TrimmedNonEmptyString, diff --git a/packages/contracts/src/ws.ts b/packages/contracts/src/ws.ts index 0929c8d7..568cfa2c 100644 --- a/packages/contracts/src/ws.ts +++ b/packages/contracts/src/ws.ts @@ -5,6 +5,7 @@ import { ClientOrchestrationCommand, OrchestrationEvent, OrchestrationImportThreadInput, + OrchestrationImportLegacyT3StateInput, OrchestrationShellStreamItem, OrchestrationSubscribeShellInput, OrchestrationSubscribeThreadInput, @@ -55,6 +56,7 @@ import { import { OpenInEditorInput } from "./editor"; import { ServerConfigUpdatedPayload, + ServerGetProviderUsageSnapshotInput, ServerProviderStatusesUpdatedPayload, ServerVoiceTranscriptionInput, } from "./server"; @@ -112,6 +114,7 @@ export const WS_METHODS = { serverGetConfig: "server.getConfig", serverRefreshProviders: "server.refreshProviders", serverListWorktrees: "server.listWorktrees", + serverGetProviderUsageSnapshot: "server.getProviderUsageSnapshot", serverTranscribeVoice: "server.transcribeVoice", serverUpsertKeybinding: "server.upsertKeybinding", @@ -155,6 +158,10 @@ const WebSocketRequestBody = Schema.Union([ Schema.Struct({ command: ClientOrchestrationCommand }), ), tagRequestBody(ORCHESTRATION_WS_METHODS.importThread, OrchestrationImportThreadInput), + tagRequestBody( + ORCHESTRATION_WS_METHODS.importLegacyT3State, + OrchestrationImportLegacyT3StateInput, + ), tagRequestBody(ORCHESTRATION_WS_METHODS.getSnapshot, OrchestrationGetSnapshotInput), tagRequestBody(ORCHESTRATION_WS_METHODS.repairState, OrchestrationRepairStateInput), tagRequestBody(ORCHESTRATION_WS_METHODS.getTurnDiff, OrchestrationGetTurnDiffInput), @@ -202,6 +209,7 @@ const WebSocketRequestBody = Schema.Union([ tagRequestBody(WS_METHODS.serverGetConfig, Schema.Struct({})), tagRequestBody(WS_METHODS.serverRefreshProviders, Schema.Struct({})), tagRequestBody(WS_METHODS.serverListWorktrees, Schema.Struct({})), + tagRequestBody(WS_METHODS.serverGetProviderUsageSnapshot, ServerGetProviderUsageSnapshotInput), tagRequestBody(WS_METHODS.serverTranscribeVoice, ServerVoiceTranscriptionInput), tagRequestBody(WS_METHODS.serverUpsertKeybinding, KeybindingRule),