From c5e06ecb82a6aa95addf060ba3f8b264c4698a49 Mon Sep 17 00:00:00 2001 From: Baha chammakhi Date: Thu, 25 Jun 2026 01:10:29 +0100 Subject: [PATCH] feat: delegate task --- apps/desktop/package.json | 2 +- apps/server/src/mcp/McpHttpServer.ts | 16 +- .../src/mcp/toolkits/delegation/handlers.ts | 64 ++++ .../src/mcp/toolkits/delegation/tools.ts | 59 ++++ .../Layers/ProjectionPipeline.ts | 2 + .../Layers/ProjectionSnapshotQuery.ts | 20 ++ .../orchestration/Layers/TaskOrchestrator.ts | 281 ++++++++++++++++++ .../Services/TaskOrchestrator.ts | 60 ++++ .../src/orchestration/TaskModelRouter.test.ts | 92 ++++++ .../src/orchestration/TaskModelRouter.ts | 70 +++++ apps/server/src/orchestration/decider.ts | 4 + apps/server/src/orchestration/projector.ts | 2 + .../Layers/ProjectionRepositories.test.ts | 2 + .../persistence/Layers/ProjectionThreads.ts | 10 + apps/server/src/persistence/Migrations.ts | 2 + .../033_ProjectionThreadDelegation.ts | 30 ++ .../persistence/Services/ProjectionThreads.ts | 2 + apps/server/src/server.ts | 9 +- apps/web/src/components/Sidebar.logic.test.ts | 35 +++ apps/web/src/components/Sidebar.logic.ts | 34 +++ apps/web/src/components/Sidebar.tsx | 27 +- packages/contracts/src/index.ts | 1 + packages/contracts/src/orchestration.ts | 10 + packages/contracts/src/settings.ts | 28 ++ packages/contracts/src/taskDelegation.ts | 70 +++++ scripts/build-desktop-artifact.test.ts | 8 +- scripts/build-desktop-artifact.ts | 2 +- 27 files changed, 927 insertions(+), 15 deletions(-) create mode 100644 apps/server/src/mcp/toolkits/delegation/handlers.ts create mode 100644 apps/server/src/mcp/toolkits/delegation/tools.ts create mode 100644 apps/server/src/orchestration/Layers/TaskOrchestrator.ts create mode 100644 apps/server/src/orchestration/Services/TaskOrchestrator.ts create mode 100644 apps/server/src/orchestration/TaskModelRouter.test.ts create mode 100644 apps/server/src/orchestration/TaskModelRouter.ts create mode 100644 apps/server/src/persistence/Migrations/033_ProjectionThreadDelegation.ts create mode 100644 packages/contracts/src/taskDelegation.ts diff --git a/apps/desktop/package.json b/apps/desktop/package.json index bb52416cc77..5f015e00d8c 100644 --- a/apps/desktop/package.json +++ b/apps/desktop/package.json @@ -35,5 +35,5 @@ "tailwindcss": "^4.0.0", "vite-plus": "catalog:" }, - "productName": "T3 Code (Alpha)" + "productName": "T3 Code Baha" } diff --git a/apps/server/src/mcp/McpHttpServer.ts b/apps/server/src/mcp/McpHttpServer.ts index e95662a30f8..bc5cf3c53d3 100644 --- a/apps/server/src/mcp/McpHttpServer.ts +++ b/apps/server/src/mcp/McpHttpServer.ts @@ -13,6 +13,8 @@ import packageJson from "../../package.json" with { type: "json" }; import * as McpInvocationContext from "./McpInvocationContext.ts"; import * as McpSessionRegistry from "./McpSessionRegistry.ts"; import * as PreviewAutomationBroker from "./PreviewAutomationBroker.ts"; +import { DelegationToolkitHandlersLive } from "./toolkits/delegation/handlers.ts"; +import { DelegationToolkit } from "./toolkits/delegation/tools.ts"; import { PreviewSnapshotToolkitHandlersLive, PreviewStandardToolkitHandlersLive, @@ -208,13 +210,19 @@ export const PreviewToolkitRegistrationLive = Layer.mergeAll( PreviewSnapshotRegistrationLive, ); +// Exposes the `delegate_tasks` tool to every provider session. Requires +// `TaskOrchestrator`, satisfied by the orchestration runtime (RuntimeServicesLive). +const DelegationToolkitRegistrationLive = McpServer.toolkit(DelegationToolkit).pipe( + Layer.provide(DelegationToolkitHandlersLive), +); + const McpTransportLive = McpServer.layerHttp({ name: "T3 Code", version: packageJson.version, path: "/mcp", }).pipe(Layer.provide(McpAuthMiddlewareLive)); -export const layer = PreviewToolkitRegistrationLive.pipe( - Layer.provideMerge(McpTransportLive), - Layer.provide(PreviewAutomationBroker.layer), -); +export const layer = Layer.mergeAll( + PreviewToolkitRegistrationLive, + DelegationToolkitRegistrationLive, +).pipe(Layer.provideMerge(McpTransportLive), Layer.provide(PreviewAutomationBroker.layer)); diff --git a/apps/server/src/mcp/toolkits/delegation/handlers.ts b/apps/server/src/mcp/toolkits/delegation/handlers.ts new file mode 100644 index 00000000000..c733a21e10d --- /dev/null +++ b/apps/server/src/mcp/toolkits/delegation/handlers.ts @@ -0,0 +1,64 @@ +import type { DelegateTaskResult } from "@t3tools/contracts"; +import * as Effect from "effect/Effect"; + +import { TaskOrchestrator } from "../../../orchestration/Services/TaskOrchestrator.ts"; +import * as McpInvocationContext from "../../McpInvocationContext.ts"; +import { DelegationToolkit } from "./tools.ts"; + +// Bounded wait kept comfortably under typical MCP client request timeouts so a +// single tool call never times out, even when sub-tasks take minutes. Slow +// sub-tasks come back 'running' and are picked up by collect_delegated_tasks. +const INLINE_WAIT_MS = 30_000; + +const handlers = { + delegate_tasks: (input) => + Effect.gen(function* () { + const scope = yield* McpInvocationContext.McpInvocationContext; + const orchestrator = yield* TaskOrchestrator; + // Depth-1 recursion guard: a thread that was itself spawned as a + // delegated sub-task may not delegate further. + const isChild = yield* orchestrator.isDelegatedChild(scope.threadId); + if (isChild) { + return yield* Effect.fail({ + reason: "Delegated sub-tasks cannot delegate further (depth-1 limit).", + }); + } + const started = yield* orchestrator.startTasks({ + parentThreadId: scope.threadId, + tasks: input.tasks, + maxConcurrency: input.maxConcurrency, + }); + const runningIds = started.results + .filter((result) => result.status === "running") + .map((result) => result.threadId); + if (runningIds.length === 0) { + return started; + } + // Wait a bounded time so fast sub-tasks come back inline; the rest stay + // 'running' for collect_delegated_tasks to pick up later. + const collected = yield* orchestrator.collectTasks({ + parentThreadId: scope.threadId, + threadIds: runningIds, + waitMs: INLINE_WAIT_MS, + }); + const collectedByThread = new Map( + collected.results.map((result) => [result.threadId, result]), + ); + const results = started.results.map((result) => + result.status === "error" ? result : (collectedByThread.get(result.threadId) ?? result), + ); + return { results }; + }), + collect_delegated_tasks: (input) => + Effect.gen(function* () { + const scope = yield* McpInvocationContext.McpInvocationContext; + const orchestrator = yield* TaskOrchestrator; + return yield* orchestrator.collectTasks({ + parentThreadId: scope.threadId, + threadIds: input.threadIds, + waitMs: INLINE_WAIT_MS, + }); + }), +} satisfies Parameters[0]; + +export const DelegationToolkitHandlersLive = DelegationToolkit.toLayer(handlers); diff --git a/apps/server/src/mcp/toolkits/delegation/tools.ts b/apps/server/src/mcp/toolkits/delegation/tools.ts new file mode 100644 index 00000000000..b872a4cd0f3 --- /dev/null +++ b/apps/server/src/mcp/toolkits/delegation/tools.ts @@ -0,0 +1,59 @@ +import { + CollectDelegatedTasksRequest, + DelegateTasksRequest, + DelegateTasksResult, +} from "@t3tools/contracts"; +import * as Schema from "effect/Schema"; +import { Tool, Toolkit } from "effect/unstable/ai"; + +import { TaskOrchestrator } from "../../../orchestration/Services/TaskOrchestrator.ts"; +import * as McpInvocationContext from "../../McpInvocationContext.ts"; + +const dependencies = [McpInvocationContext.McpInvocationContext, TaskOrchestrator]; + +/** Returned only when the call is refused (e.g. a child trying to delegate). */ +export const DelegateTasksFailure = Schema.Struct({ + reason: Schema.String, +}); + +export const DelegateTasksTool = Tool.make("delegate_tasks", { + description: + "Delegate sub-tasks to fresh child agents that run in parallel. Returns quickly: each result is " + + "either finished ('completed'/'error') or still 'running' (with its `threadId`). For any result " + + "with status 'running', call `collect_delegated_tasks` afterwards to get its final message — do not " + + "assume a 'running' task failed. " + + "WHEN TO USE — proactively, without being asked: whenever a request breaks into 2+ parts that do " + + "not depend on each other's output (research several files at once, answer multiple questions, " + + "review/summarize different modules, independent edits). Skip it for a single linear task or when " + + "one step's result feeds the next. " + + "CHOOSING THE MODEL per sub-task — set `modelHint`: 'cheap' for trivial/mechanical lookups (list " + + "files, grep, count, simple summaries), 'balanced' for normal work (default when unsure), 'strong' " + + "for hard reasoning, code review, or tricky debugging. For planning or design sub-tasks, put the word " + + '"plan" in the `label` so they route to the strongest model. Pass `modelSelection` instead only when ' + + "the user named a specific model. " + + "Always give each sub-task a short descriptive `label`. Sub-tasks share this thread's working " + + "directory and run concurrently, so keep their file changes non-overlapping to avoid conflicts.", + parameters: DelegateTasksRequest, + success: DelegateTasksResult, + failure: DelegateTasksFailure, + dependencies, +}) + .annotate(Tool.Title, "Delegate sub-tasks") + .annotate(Tool.Destructive, true) + .annotate(Tool.OpenWorld, true); + +export const CollectDelegatedTasksTool = Tool.make("collect_delegated_tasks", { + description: + "Collect results for sub-tasks that were still 'running' when `delegate_tasks` (or a prior " + + "`collect_delegated_tasks`) returned. Pass the `threadIds` of the running sub-tasks, or omit them to " + + "collect every still-running sub-task you delegated from this thread. Returns quickly; any task still " + + "not finished comes back as 'running' again — keep calling until none remain 'running'.", + parameters: CollectDelegatedTasksRequest, + success: DelegateTasksResult, + dependencies, +}) + .annotate(Tool.Title, "Collect delegated sub-tasks") + .annotate(Tool.Readonly, true) + .annotate(Tool.Idempotent, true); + +export const DelegationToolkit = Toolkit.make(DelegateTasksTool, CollectDelegatedTasksTool); diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index f12df850941..5f4fd9d3183 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -603,6 +603,8 @@ const makeOrchestrationProjectionPipeline = Effect.fn("makeOrchestrationProjecti interactionMode: event.payload.interactionMode, branch: event.payload.branch, worktreePath: event.payload.worktreePath, + parentThreadId: event.payload.parentThreadId ?? null, + taskLabel: event.payload.taskLabel ?? null, latestTurnId: null, createdAt: event.payload.createdAt, updatedAt: event.payload.updatedAt, diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index e36db35b107..eab69e3e0df 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -329,6 +329,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", @@ -357,6 +359,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", @@ -387,6 +391,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", @@ -749,6 +755,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", @@ -1181,6 +1189,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: row.interactionMode, branch: row.branch, worktreePath: row.worktreePath, + parentThreadId: row.parentThreadId ?? null, + taskLabel: row.taskLabel ?? null, latestTurn: latestTurnByThread.get(row.threadId) ?? null, createdAt: row.createdAt, updatedAt: row.updatedAt, @@ -1379,6 +1389,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: row.interactionMode, branch: row.branch, worktreePath: row.worktreePath, + parentThreadId: row.parentThreadId ?? null, + taskLabel: row.taskLabel ?? null, latestTurn: latestTurnByThread.get(row.threadId) ?? null, createdAt: row.createdAt, updatedAt: row.updatedAt, @@ -1508,6 +1520,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: row.interactionMode, branch: row.branch, worktreePath: row.worktreePath, + parentThreadId: row.parentThreadId ?? null, + taskLabel: row.taskLabel ?? null, latestTurn: latestTurnByThread.get(row.threadId) ?? null, createdAt: row.createdAt, updatedAt: row.updatedAt, @@ -1642,6 +1656,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: row.interactionMode, branch: row.branch, worktreePath: row.worktreePath, + parentThreadId: row.parentThreadId ?? null, + taskLabel: row.taskLabel ?? null, latestTurn: latestTurnByThread.get(row.threadId) ?? null, createdAt: row.createdAt, updatedAt: row.updatedAt, @@ -1882,6 +1898,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: threadRow.value.interactionMode, branch: threadRow.value.branch, worktreePath: threadRow.value.worktreePath, + parentThreadId: threadRow.value.parentThreadId ?? null, + taskLabel: threadRow.value.taskLabel ?? null, latestTurn: Option.isSome(latestTurnRow) ? mapLatestTurn(latestTurnRow.value) : null, createdAt: threadRow.value.createdAt, updatedAt: threadRow.value.updatedAt, @@ -1976,6 +1994,8 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { interactionMode: threadRow.value.interactionMode, branch: threadRow.value.branch, worktreePath: threadRow.value.worktreePath, + parentThreadId: threadRow.value.parentThreadId ?? null, + taskLabel: threadRow.value.taskLabel ?? null, latestTurn: Option.isSome(latestTurnRow) ? mapLatestTurn(latestTurnRow.value) : null, createdAt: threadRow.value.createdAt, updatedAt: threadRow.value.updatedAt, diff --git a/apps/server/src/orchestration/Layers/TaskOrchestrator.ts b/apps/server/src/orchestration/Layers/TaskOrchestrator.ts new file mode 100644 index 00000000000..4191118647b --- /dev/null +++ b/apps/server/src/orchestration/Layers/TaskOrchestrator.ts @@ -0,0 +1,281 @@ +/** + * TaskOrchestrator live layer. + * + * `startTasks` spawns one child thread per delegated sub-task in the lead's + * working directory (shared cwd) and starts its turn, returning immediately. + * `collectTasks` later polls those children up to a bounded deadline and reads + * back their final assistant message. Completion is detected via + * `latestTurn.state` leaving `"running"` (the production RuntimeReceiptBus is a + * no-op, so we do not rely on it). Splitting spawn from collect keeps each MCP + * tool call short enough to fit inside the provider's request timeout. + * + * @module TaskOrchestrator + */ +import { + CommandId, + type DelegateTaskInput, + type DelegateTaskResult, + DEFAULT_DELEGATION_CONCURRENCY, + MAX_DELEGATED_TASKS, + MessageId, + type ModelSelection, + type OrchestrationThread, + ThreadId, +} from "@t3tools/contracts"; +import * as Cause from "effect/Cause"; +import * as Clock from "effect/Clock"; +import * as Crypto from "effect/Crypto"; +import * as DateTime from "effect/DateTime"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Ref from "effect/Ref"; + +import { ServerSettingsService } from "../../serverSettings.ts"; +import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts"; +import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts"; +import { + TaskOrchestrator, + type TaskOrchestratorCollectInput, + type TaskOrchestratorShape, + type TaskOrchestratorStartInput, +} from "../Services/TaskOrchestrator.ts"; +import { resolveTaskModelSelection } from "../TaskModelRouter.ts"; + +/** How often to poll the read model for sub-task completion. */ +const POLL_INTERVAL = Duration.millis(250); +/** Title fallback length when a sub-task has no explicit label. */ +const TITLE_MAX_LENGTH = 80; + +/** Per-child metadata kept so collect can label/attribute results to their lead. */ +interface DelegatedTaskMeta { + readonly label: string | undefined; + readonly modelSelection: ModelSelection; + readonly parentThreadId: ThreadId; +} + +const deriveTitle = (task: DelegateTaskInput): string => { + if (task.label !== undefined) return task.label; + // First non-blank line of the prompt; prompt is guaranteed non-empty so this + // always yields a non-empty title (required by the thread title schema). + const firstLine = + task.prompt + .split("\n") + .map((line) => line.trim()) + .find((line) => line.length > 0) ?? task.prompt.trim(); + return firstLine.length > TITLE_MAX_LENGTH + ? `${firstLine.slice(0, TITLE_MAX_LENGTH - 1)}…` + : firstLine; +}; + +const finalAssistantText = (thread: OrchestrationThread): string | undefined => { + const latestTurn = thread.latestTurn; + if (latestTurn?.assistantMessageId != null) { + const byId = thread.messages.find((message) => message.id === latestTurn.assistantMessageId); + if (byId !== undefined) return byId.text; + } + const assistantMessages = thread.messages.filter( + (message) => + message.role === "assistant" && (latestTurn === null || message.turnId === latestTurn.turnId), + ); + return assistantMessages.at(-1)?.text; +}; + +const makeTaskOrchestrator = Effect.gen(function* () { + const engine = yield* OrchestrationEngineService; + const query = yield* ProjectionSnapshotQuery; + const settingsService = yield* ServerSettingsService; + const crypto = yield* Crypto.Crypto; + const registry = yield* Ref.make>(new Map()); + + const newUuid = crypto.randomUUIDv4.pipe(Effect.orDie); + const nowIso = Effect.map(DateTime.now, DateTime.formatIso); + + const readDetail = (threadId: ThreadId): Effect.Effect => + query.getThreadDetailById(threadId).pipe(Effect.map(Option.getOrUndefined), Effect.orDie); + + const pollUntilDone = ( + threadId: ThreadId, + deadlineMs: number, + ): Effect.Effect => + Effect.gen(function* () { + const detail = yield* readDetail(threadId); + const latestTurn = detail?.latestTurn ?? null; + if (detail !== undefined && latestTurn !== null && latestTurn.state !== "running") { + return detail; + } + const now = yield* Clock.currentTimeMillis; + if (now >= deadlineMs) return detail; + yield* Effect.sleep(POLL_INTERVAL); + return yield* pollUntilDone(threadId, deadlineMs); + }); + + const resultBase = (threadId: ThreadId, meta: DelegatedTaskMeta | undefined) => ({ + label: meta?.label, + threadId, + instanceId: meta?.modelSelection.instanceId, + model: meta?.modelSelection.model, + }); + + const resultFromDetail = ( + threadId: ThreadId, + meta: DelegatedTaskMeta | undefined, + detail: OrchestrationThread | undefined, + ): DelegateTaskResult => { + const base = resultBase(threadId, meta); + const latestTurn = detail?.latestTurn ?? null; + if (detail === undefined || latestTurn === null || latestTurn.state === "running") { + return { ...base, status: "running" } satisfies DelegateTaskResult; + } + if (latestTurn.state === "error") { + return { + ...base, + status: "error", + error: detail.session?.lastError ?? "Sub-task ended in an error state.", + } satisfies DelegateTaskResult; + } + return { + ...base, + status: "completed", + message: finalAssistantText(detail) ?? "", + } satisfies DelegateTaskResult; + }; + + const spawnOne = ( + parent: OrchestrationThread, + routing: TaskOrchestratorRoutingSettings, + task: DelegateTaskInput, + ): Effect.Effect => + Effect.gen(function* () { + const childId = ThreadId.make(yield* newUuid); + const modelSelection = resolveTaskModelSelection(task, { + parentModelSelection: parent.modelSelection, + routing, + }); + const base = { + label: task.label, + threadId: childId, + instanceId: modelSelection.instanceId, + model: modelSelection.model, + } as const; + + return yield* Effect.gen(function* () { + const createdAt = yield* nowIso; + yield* engine + .dispatch({ + type: "thread.create", + commandId: CommandId.make(`mcp:delegate:create:${childId}`), + threadId: childId, + projectId: parent.projectId, + title: deriveTitle(task), + modelSelection, + runtimeMode: parent.runtimeMode, + interactionMode: parent.interactionMode, + branch: null, + // Shared working directory: child runs where the lead thread runs. + worktreePath: parent.worktreePath, + parentThreadId: parent.id, + ...(task.label !== undefined ? { taskLabel: task.label } : {}), + createdAt, + }) + .pipe(Effect.orDie); + + const turnAt = yield* nowIso; + const messageId = MessageId.make(yield* newUuid); + yield* engine + .dispatch({ + type: "thread.turn.start", + commandId: CommandId.make(`mcp:delegate:turn:${childId}`), + threadId: childId, + message: { messageId, role: "user", text: task.prompt, attachments: [] }, + modelSelection, + runtimeMode: parent.runtimeMode, + interactionMode: parent.interactionMode, + createdAt: turnAt, + }) + .pipe(Effect.orDie); + + yield* Ref.update(registry, (current) => { + const next = new Map(current); + next.set(childId, { label: task.label, modelSelection, parentThreadId: parent.id }); + return next; + }); + + return { ...base, status: "running" } satisfies DelegateTaskResult; + }).pipe( + Effect.catchCause((cause) => + Effect.succeed({ + ...base, + status: "error", + error: Cause.pretty(cause), + } satisfies DelegateTaskResult), + ), + ); + }); + + const startTasks: TaskOrchestratorShape["startTasks"] = (input: TaskOrchestratorStartInput) => + Effect.gen(function* () { + const parent = yield* readDetail(input.parentThreadId); + if (parent === undefined) { + return { results: [] }; + } + const settings = yield* settingsService.getSettings.pipe(Effect.orDie); + const tasks = input.tasks.slice(0, MAX_DELEGATED_TASKS); + const concurrency = Math.max( + 1, + Math.min(input.maxConcurrency ?? DEFAULT_DELEGATION_CONCURRENCY, MAX_DELEGATED_TASKS), + ); + const results = yield* Effect.forEach( + tasks, + (task) => spawnOne(parent, settings.taskRouting, task), + { concurrency }, + ); + return { results }; + }); + + const collectTasks: TaskOrchestratorShape["collectTasks"] = ( + input: TaskOrchestratorCollectInput, + ) => + Effect.gen(function* () { + const current = yield* Ref.get(registry); + const targetIds: ReadonlyArray = + input.threadIds !== undefined && input.threadIds.length > 0 + ? input.threadIds + : Array.from(current.entries()) + .filter(([, meta]) => meta.parentThreadId === input.parentThreadId) + .map(([id]) => ThreadId.make(id)); + if (targetIds.length === 0) { + return { results: [] }; + } + const startMs = yield* Clock.currentTimeMillis; + const deadlineMs = startMs + Math.max(0, input.waitMs); + const results = yield* Effect.forEach( + targetIds, + (threadId) => + pollUntilDone(threadId, deadlineMs).pipe( + Effect.map((detail) => resultFromDetail(threadId, current.get(threadId), detail)), + Effect.catchCause((cause) => + Effect.succeed({ + ...resultBase(threadId, current.get(threadId)), + status: "error", + error: Cause.pretty(cause), + } satisfies DelegateTaskResult), + ), + ), + { concurrency: Math.max(1, Math.min(targetIds.length, MAX_DELEGATED_TASKS)) }, + ); + return { results }; + }); + + const isDelegatedChild: TaskOrchestratorShape["isDelegatedChild"] = (threadId: ThreadId) => + Ref.get(registry).pipe(Effect.map((current) => current.has(threadId))); + + return TaskOrchestrator.of({ startTasks, collectTasks, isDelegatedChild }); +}); + +// Local alias so the routing param type stays readable without importing the +// settings type (it is the `taskRouting` slice of ServerSettings). +type TaskOrchestratorRoutingSettings = Parameters[1]["routing"]; + +export const layer = Layer.effect(TaskOrchestrator, makeTaskOrchestrator); diff --git a/apps/server/src/orchestration/Services/TaskOrchestrator.ts b/apps/server/src/orchestration/Services/TaskOrchestrator.ts new file mode 100644 index 00000000000..4940eb5c8f7 --- /dev/null +++ b/apps/server/src/orchestration/Services/TaskOrchestrator.ts @@ -0,0 +1,60 @@ +/** + * TaskOrchestrator - server-side fan-out for agent-delegated sub-tasks. + * + * Delegation is split into two non-blocking steps so no single MCP tool call + * outlives the provider's request timeout: + * + * - `startTasks` routes each sub-task to a model, spawns a child thread per + * sub-task in the lead's working directory, kicks off its turn, and returns + * immediately with one `status: "running"` (or `"error"`) handle per task. + * - `collectTasks` polls the given child threads up to a bounded deadline and + * returns their final messages, leaving any that are still going as + * `status: "running"` so the caller can collect them again later. + * + * Spawned children are tracked in-memory (with their lead + routed model) so + * the `delegate_tasks` tool can refuse delegation from an already-delegated + * child (depth-1 recursion guard) and `collect_delegated_tasks` can resolve a + * lead's outstanding children. + * + * @module TaskOrchestrator + */ +import type { DelegateTaskInput, DelegateTasksResult, ThreadId } from "@t3tools/contracts"; +import * as Context from "effect/Context"; +import type * as Effect from "effect/Effect"; + +export interface TaskOrchestratorStartInput { + readonly parentThreadId: ThreadId; + readonly tasks: ReadonlyArray; + readonly maxConcurrency?: number | undefined; +} + +export interface TaskOrchestratorCollectInput { + /** Child threads to collect. When omitted, every running child of the lead. */ + readonly threadIds?: ReadonlyArray | undefined; + /** Lead thread, used to scope an omitted `threadIds` to its own children. */ + readonly parentThreadId: ThreadId; + /** Max time to wait for still-running children before returning. */ + readonly waitMs: number; +} + +export interface TaskOrchestratorShape { + /** + * Spawn a child thread per sub-task and start its turn. Returns immediately + * with a handle per task (`status: "running"`, or `"error"` if spawning + * failed). Never blocks on completion. + */ + readonly startTasks: (input: TaskOrchestratorStartInput) => Effect.Effect; + /** + * Poll the given child threads up to `waitMs` and return their results; + * children still running at the deadline come back as `status: "running"`. + */ + readonly collectTasks: ( + input: TaskOrchestratorCollectInput, + ) => Effect.Effect; + /** True if `threadId` was spawned as a delegated child during this process. */ + readonly isDelegatedChild: (threadId: ThreadId) => Effect.Effect; +} + +export class TaskOrchestrator extends Context.Service()( + "t3/orchestration/Services/TaskOrchestrator", +) {} diff --git a/apps/server/src/orchestration/TaskModelRouter.test.ts b/apps/server/src/orchestration/TaskModelRouter.test.ts new file mode 100644 index 00000000000..0cd4d78ffaa --- /dev/null +++ b/apps/server/src/orchestration/TaskModelRouter.test.ts @@ -0,0 +1,92 @@ +import { + type DelegateTaskInput, + type ModelSelection, + ProviderInstanceId, + type TaskRoutingSettings, +} from "@t3tools/contracts"; +import { describe, expect, it } from "vite-plus/test"; + +import { resolveTaskModelSelection } from "./TaskModelRouter.ts"; + +const sel = (instanceId: string, model: string): ModelSelection => ({ + instanceId: ProviderInstanceId.make(instanceId), + model, +}); + +const parent = sel("codex", "gpt-5.4"); + +const task = (overrides: Partial = {}): DelegateTaskInput => ({ + prompt: "do the thing", + ...overrides, +}); + +const routing = (settings: Partial): TaskRoutingSettings => ({ + rules: [], + ...settings, +}); + +describe("resolveTaskModelSelection", () => { + it("uses the explicit per-task modelSelection above everything else", () => { + const explicit = sel("claudeAgent", "claude-opus-4-8"); + const result = resolveTaskModelSelection( + task({ modelSelection: explicit, modelHint: "cheap" }), + { + parentModelSelection: parent, + routing: routing({ + rules: [{ when: { modelHint: "cheap" }, use: sel("codex", "gpt-5.4-mini") }], + default: sel("codex", "gpt-5.4-nano"), + }), + }, + ); + expect(result).toEqual(explicit); + }); + + it("routes by the first matching rule (modelHint)", () => { + const strong = sel("claudeAgent", "claude-opus-4-8"); + const result = resolveTaskModelSelection(task({ modelHint: "strong" }), { + parentModelSelection: parent, + routing: routing({ + rules: [ + { when: { modelHint: "cheap" }, use: sel("codex", "gpt-5.4-mini") }, + { when: { modelHint: "strong" }, use: strong }, + ], + }), + }); + expect(result).toEqual(strong); + }); + + it("matches a rule by case-insensitive label substring", () => { + const docsModel = sel("codex", "gpt-5.4-mini"); + const result = resolveTaskModelSelection(task({ label: "Write DOCS for module" }), { + parentModelSelection: parent, + routing: routing({ rules: [{ when: { labelMatches: "docs" }, use: docsModel }] }), + }); + expect(result).toEqual(docsModel); + }); + + it("falls back to routing.default when no rule matches", () => { + const fallback = sel("codex", "gpt-5.4-nano"); + const result = resolveTaskModelSelection(task({ modelHint: "balanced" }), { + parentModelSelection: parent, + routing: routing({ + rules: [{ when: { modelHint: "cheap" }, use: sel("codex", "gpt-5.4-mini") }], + default: fallback, + }), + }); + expect(result).toEqual(fallback); + }); + + it("falls back to the parent model when there is no routing config", () => { + const result = resolveTaskModelSelection(task(), { parentModelSelection: parent }); + expect(result).toEqual(parent); + }); + + it("skips candidates whose instance is disabled", () => { + const disabled = sel("brokenInstance", "whatever"); + const result = resolveTaskModelSelection(task({ modelSelection: disabled }), { + parentModelSelection: parent, + isInstanceEnabled: (instanceId) => instanceId !== "brokenInstance", + }); + expect(result).toEqual(parent); + }); +}); diff --git a/apps/server/src/orchestration/TaskModelRouter.ts b/apps/server/src/orchestration/TaskModelRouter.ts new file mode 100644 index 00000000000..90a7593054f --- /dev/null +++ b/apps/server/src/orchestration/TaskModelRouter.ts @@ -0,0 +1,70 @@ +import type { + DelegateTaskInput, + ModelSelection, + TaskRoutingRule, + TaskRoutingSettings, +} from "@t3tools/contracts"; + +/** + * Context the router needs beyond the task itself: the lead thread's model (the + * ultimate fallback), the configured routing rules, and an optional predicate + * that reports whether a provider instance is currently usable. + */ +export interface TaskModelRouterContext { + readonly parentModelSelection: ModelSelection; + readonly routing?: TaskRoutingSettings | undefined; + /** Returns false for instances that are disabled/uninstalled so we skip them. */ + readonly isInstanceEnabled?: ((instanceId: string) => boolean) | undefined; +} + +const ruleMatches = (when: TaskRoutingRule["when"], task: DelegateTaskInput): boolean => { + if (when.modelHint !== undefined && when.modelHint !== task.modelHint) { + return false; + } + if (when.labelMatches !== undefined) { + const label = task.label?.toLowerCase() ?? ""; + if (!label.includes(when.labelMatches.toLowerCase())) { + return false; + } + } + return true; +}; + +/** + * Resolve which model a delegated sub-task should run on, using this priority + * chain (mirrors the app's existing default-resolution order): + * + * 1. Explicit `task.modelSelection` (manual override). + * 2. First matching routing rule's `use`. + * 3. `routing.default`. + * 4. The lead thread's model selection. + * + * Candidates whose instance is reported disabled are skipped; if every + * candidate is disabled we still return the parent's selection so the sub-task + * can run rather than fail to start. + */ +export const resolveTaskModelSelection = ( + task: DelegateTaskInput, + context: TaskModelRouterContext, +): ModelSelection => { + const candidates: Array = []; + + if (task.modelSelection !== undefined) { + candidates.push(task.modelSelection); + } + + const matchedRule = context.routing?.rules.find((rule) => ruleMatches(rule.when, task)); + if (matchedRule !== undefined) { + candidates.push(matchedRule.use); + } + + if (context.routing?.default !== undefined) { + candidates.push(context.routing.default); + } + + candidates.push(context.parentModelSelection); + + const isEnabled = context.isInstanceEnabled ?? (() => true); + const usable = candidates.find((candidate) => isEnabled(candidate.instanceId)); + return usable ?? context.parentModelSelection; +}; diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 0d4af771ca8..26feda39f8a 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -239,6 +239,10 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" interactionMode: command.interactionMode, branch: command.branch, worktreePath: command.worktreePath, + ...(command.parentThreadId !== undefined + ? { parentThreadId: command.parentThreadId } + : {}), + ...(command.taskLabel !== undefined ? { taskLabel: command.taskLabel } : {}), createdAt: command.createdAt, updatedAt: command.createdAt, }, diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index fc6ab8f6fcf..38abc6984f5 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -282,6 +282,8 @@ export function projectEvent( interactionMode: payload.interactionMode, branch: payload.branch, worktreePath: payload.worktreePath, + parentThreadId: payload.parentThreadId ?? null, + taskLabel: payload.taskLabel ?? null, latestTurn: null, createdAt: payload.createdAt, updatedAt: payload.updatedAt, diff --git a/apps/server/src/persistence/Layers/ProjectionRepositories.test.ts b/apps/server/src/persistence/Layers/ProjectionRepositories.test.ts index a2069e62a14..e8651c17bbc 100644 --- a/apps/server/src/persistence/Layers/ProjectionRepositories.test.ts +++ b/apps/server/src/persistence/Layers/ProjectionRepositories.test.ts @@ -87,6 +87,8 @@ projectionRepositoriesLayer("Projection repositories", (it) => { interactionMode: "default", branch: null, worktreePath: null, + parentThreadId: null, + taskLabel: null, latestTurnId: null, createdAt: "2026-03-24T00:00:00.000Z", updatedAt: "2026-03-24T00:00:00.000Z", diff --git a/apps/server/src/persistence/Layers/ProjectionThreads.ts b/apps/server/src/persistence/Layers/ProjectionThreads.ts index 1baeb375c15..19a19a1c431 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreads.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreads.ts @@ -39,6 +39,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { interaction_mode, branch, worktree_path, + parent_thread_id, + task_label, latest_turn_id, created_at, updated_at, @@ -58,6 +60,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { ${row.interactionMode}, ${row.branch}, ${row.worktreePath}, + ${row.parentThreadId}, + ${row.taskLabel}, ${row.latestTurnId}, ${row.createdAt}, ${row.updatedAt}, @@ -77,6 +81,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { interaction_mode = excluded.interaction_mode, branch = excluded.branch, worktree_path = excluded.worktree_path, + parent_thread_id = excluded.parent_thread_id, + task_label = excluded.task_label, latest_turn_id = excluded.latest_turn_id, created_at = excluded.created_at, updated_at = excluded.updated_at, @@ -103,6 +109,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", @@ -131,6 +139,8 @@ const makeProjectionThreadRepository = Effect.gen(function* () { interaction_mode AS "interactionMode", branch, worktree_path AS "worktreePath", + parent_thread_id AS "parentThreadId", + task_label AS "taskLabel", latest_turn_id AS "latestTurnId", created_at AS "createdAt", updated_at AS "updatedAt", diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index ba1131ee259..ee05605c626 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -45,6 +45,7 @@ import Migration0029 from "./Migrations/029_ProjectionThreadDetailOrderingIndexe import Migration0030 from "./Migrations/030_ProjectionThreadShellArchiveIndexes.ts"; import Migration0031 from "./Migrations/031_AuthAuthorizationScopes.ts"; import Migration0032 from "./Migrations/032_AuthPairingProofKeyThumbprint.ts"; +import Migration0033 from "./Migrations/033_ProjectionThreadDelegation.ts"; /** * Migration loader with all migrations defined inline. @@ -89,6 +90,7 @@ export const migrationEntries = [ [30, "ProjectionThreadShellArchiveIndexes", Migration0030], [31, "AuthAuthorizationScopes", Migration0031], [32, "AuthPairingProofKeyThumbprint", Migration0032], + [33, "ProjectionThreadDelegation", Migration0033], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/033_ProjectionThreadDelegation.ts b/apps/server/src/persistence/Migrations/033_ProjectionThreadDelegation.ts new file mode 100644 index 00000000000..4bcbb2b1302 --- /dev/null +++ b/apps/server/src/persistence/Migrations/033_ProjectionThreadDelegation.ts @@ -0,0 +1,30 @@ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as Effect from "effect/Effect"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const columns = yield* sql<{ readonly name: string }>` + PRAGMA table_info(projection_threads) + `; + + if (!columns.some((column) => column.name === "parent_thread_id")) { + yield* sql` + ALTER TABLE projection_threads + ADD COLUMN parent_thread_id TEXT + `; + } + + if (!columns.some((column) => column.name === "task_label")) { + yield* sql` + ALTER TABLE projection_threads + ADD COLUMN task_label TEXT + `; + } + + // Speeds up grouping a lead thread's delegated children in the sidebar. + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_threads_parent + ON projection_threads(parent_thread_id) + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreads.ts b/apps/server/src/persistence/Services/ProjectionThreads.ts index 44fdc147a4a..14a59369a99 100644 --- a/apps/server/src/persistence/Services/ProjectionThreads.ts +++ b/apps/server/src/persistence/Services/ProjectionThreads.ts @@ -32,6 +32,8 @@ export const ProjectionThread = Schema.Struct({ interactionMode: ProviderInteractionMode, branch: Schema.NullOr(Schema.String), worktreePath: Schema.NullOr(Schema.String), + parentThreadId: Schema.NullOr(ThreadId), + taskLabel: Schema.NullOr(Schema.String), latestTurnId: Schema.NullOr(TurnId), createdAt: IsoDateTime, updatedAt: IsoDateTime, diff --git a/apps/server/src/server.ts b/apps/server/src/server.ts index 81d0013b20c..260509f64c0 100644 --- a/apps/server/src/server.ts +++ b/apps/server/src/server.ts @@ -82,6 +82,7 @@ import * as ProcessDiagnostics from "./diagnostics/ProcessDiagnostics.ts"; import * as ProcessResourceMonitor from "./diagnostics/ProcessResourceMonitor.ts"; import * as TraceDiagnostics from "./diagnostics/TraceDiagnostics.ts"; import { OrchestrationLayerLive } from "./orchestration/runtimeLayer.ts"; +import { layer as TaskOrchestratorLive } from "./orchestration/Layers/TaskOrchestrator.ts"; import { clearPersistedServerRuntimeState, makePersistedServerRuntimeState, @@ -356,7 +357,13 @@ export const makeRoutesLayer = Layer.mergeAll( staticAndDevRouteLayer, websocketRpcRouteLayer, ), - McpHttpServer.layer.pipe(Layer.provide(McpSessionRegistry.layer)), + McpHttpServer.layer.pipe( + Layer.provide(McpSessionRegistry.layer), + // TaskOrchestrator backs the `delegate_tasks` MCP tool. Its deps + // (engine, projection query, settings, crypto) are satisfied by the + // surrounding RuntimeServicesLive / platform layers. + Layer.provide(TaskOrchestratorLive), + ), ).pipe(Layer.provide(browserApiCorsLayer)); export const makeServerLayer = Layer.unwrap( diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index 574e33d4dab..0bbb9a8c98d 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -6,6 +6,7 @@ import { resolveAdjacentThreadId, getFallbackThreadIdAfterDelete, getVisibleThreadsForProject, + nestDelegatedChildren, getProjectSortTimestamp, hasUnseenCompletion, isContextMenuPointerDown, @@ -1066,3 +1067,37 @@ describe("sortProjectsForSidebar", () => { expect(timestamp).toBe(Date.parse("2026-03-09T10:10:00.000Z")); }); }); + +describe("nestDelegatedChildren", () => { + const t = (id: string, parentThreadId?: string | null) => ({ id, parentThreadId }); + + it("places each child directly after its lead, preserving lead order", () => { + const ordered = nestDelegatedChildren([ + t("lead-a"), + t("lead-b"), + t("child-b1", "lead-b"), + t("child-a1", "lead-a"), + ]); + expect(ordered.map((thread) => thread.id)).toEqual([ + "lead-a", + "child-a1", + "lead-b", + "child-b1", + ]); + }); + + it("keeps a child's relative order under its lead", () => { + const ordered = nestDelegatedChildren([t("lead"), t("child-1", "lead"), t("child-2", "lead")]); + expect(ordered.map((thread) => thread.id)).toEqual(["lead", "child-1", "child-2"]); + }); + + it("treats a child whose lead is absent as a top-level thread", () => { + const ordered = nestDelegatedChildren([t("orphan", "missing-lead"), t("lead")]); + expect(ordered.map((thread) => thread.id)).toEqual(["orphan", "lead"]); + }); + + it("leaves plain thread lists untouched", () => { + const ordered = nestDelegatedChildren([t("a"), t("b", null), t("c")]); + expect(ordered.map((thread) => thread.id)).toEqual(["a", "b", "c"]); + }); +}); diff --git a/apps/web/src/components/Sidebar.logic.ts b/apps/web/src/components/Sidebar.logic.ts index 4e7614ed551..34fd7c42f28 100644 --- a/apps/web/src/components/Sidebar.logic.ts +++ b/apps/web/src/components/Sidebar.logic.ts @@ -442,6 +442,40 @@ export function resolveProjectStatusIndicator( return highestPriorityStatus; } +/** + * Reorder a sorted thread list so each delegated sub-task appears directly + * after its lead thread, preserving the existing sort order among lead threads + * and among each lead's children. Children whose lead is not in the list keep + * their original position (treated as a lead). Delegation is depth-1, so no + * recursion is needed. + */ +export function nestDelegatedChildren< + T extends { readonly id: string; readonly parentThreadId?: string | null | undefined }, +>(threads: readonly T[]): T[] { + const presentIds = new Set(threads.map((thread) => thread.id)); + const childrenByParent = new Map(); + const leads: T[] = []; + for (const thread of threads) { + const parentId = thread.parentThreadId ?? null; + if (parentId !== null && parentId !== thread.id && presentIds.has(parentId)) { + const siblings = childrenByParent.get(parentId) ?? []; + siblings.push(thread); + childrenByParent.set(parentId, siblings); + } else { + leads.push(thread); + } + } + const ordered: T[] = []; + for (const lead of leads) { + ordered.push(lead); + const children = childrenByParent.get(lead.id); + if (children !== undefined) { + ordered.push(...children); + } + } + return ordered; +} + export function getVisibleThreadsForProject>(input: { threads: readonly T[]; activeThreadId: T["id"] | undefined; diff --git a/apps/web/src/components/Sidebar.tsx b/apps/web/src/components/Sidebar.tsx index ce925618caa..1443d1ca708 100644 --- a/apps/web/src/components/Sidebar.tsx +++ b/apps/web/src/components/Sidebar.tsx @@ -184,6 +184,7 @@ import { resolveAdjacentThreadId, isContextMenuPointerDown, isTrailingDoubleClick, + nestDelegatedChildren, resolveProjectStatusIndicator, resolveSidebarNewThreadSeedContext, resolveSidebarNewThreadEnvMode, @@ -379,6 +380,9 @@ export const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThr } = props; const threadRef = scopeThreadRef(thread.environmentId, thread.id); const threadKey = scopedThreadKey(threadRef); + // Delegated sub-tasks are nested under their lead thread in the list; indent + // them and show a corner marker so the grouping reads clearly. + const isDelegatedChild = thread.parentThreadId != null; const lastVisitedAt = useUiStateStore((state) => state.threadLastVisitedAtById[threadKey]); const isSelected = useThreadSelectionStore((state) => state.selectedThreadKeys.has(threadKey)); const runningTerminalIds = useThreadRunningTerminalIds({ @@ -677,7 +681,20 @@ export const SidebarThreadRow = memo(function SidebarThreadRow(props: SidebarThr onKeyDown={handleRowKeyDown} onContextMenu={handleRowContextMenu} > -
+
+ {isDelegatedChild && ( + + ⤷ + + )} {prStatus && ( thread.archivedAt === null), - threadSortOrder, + const visibleProjectThreads = nestDelegatedChildren( + sortThreads( + projectThreads.filter((thread) => thread.archivedAt === null), + threadSortOrder, + ), ); const projectStatus = resolveProjectStatusIndicator( visibleProjectThreads.map((thread) => resolveProjectThreadStatus(thread)), diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index 43270efdec7..ada7fece7ab 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -18,6 +18,7 @@ export * from "./git.ts"; export * from "./vcs.ts"; export * from "./sourceControl.ts"; export * from "./orchestration.ts"; +export * from "./taskDelegation.ts"; export * from "./editor.ts"; export * from "./project.ts"; export * from "./filesystem.ts"; diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 623fed0917b..bef629ada8a 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -352,6 +352,8 @@ export const OrchestrationThread = Schema.Struct({ ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), + parentThreadId: Schema.optional(Schema.NullOr(ThreadId)), + taskLabel: Schema.optional(Schema.NullOr(TrimmedNonEmptyString)), latestTurn: Schema.NullOr(OrchestrationLatestTurn), createdAt: IsoDateTime, updatedAt: IsoDateTime, @@ -398,6 +400,8 @@ export const OrchestrationThreadShell = Schema.Struct({ ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), + parentThreadId: Schema.optional(Schema.NullOr(ThreadId)), + taskLabel: Schema.optional(Schema.NullOr(TrimmedNonEmptyString)), latestTurn: Schema.NullOr(OrchestrationLatestTurn), createdAt: IsoDateTime, updatedAt: IsoDateTime, @@ -503,6 +507,10 @@ const ThreadCreateCommand = Schema.Struct({ ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), + // Links a delegated sub-task thread to the lead thread that spawned it. + parentThreadId: Schema.optional(ThreadId), + // Short label describing the sub-task, recorded for audit/UI grouping. + taskLabel: Schema.optional(TrimmedNonEmptyString), createdAt: IsoDateTime, }); @@ -847,6 +855,8 @@ export const ThreadCreatedPayload = Schema.Struct({ ), branch: Schema.NullOr(TrimmedNonEmptyString), worktreePath: Schema.NullOr(TrimmedNonEmptyString), + parentThreadId: Schema.optional(ThreadId), + taskLabel: Schema.optional(TrimmedNonEmptyString), createdAt: IsoDateTime, updatedAt: IsoDateTime, }); diff --git a/packages/contracts/src/settings.ts b/packages/contracts/src/settings.ts index 6ccd65533dd..a3d7590d3cd 100644 --- a/packages/contracts/src/settings.ts +++ b/packages/contracts/src/settings.ts @@ -6,6 +6,30 @@ import { TrimmedNonEmptyString, TrimmedString } from "./baseSchemas.ts"; import { DEFAULT_GIT_TEXT_GENERATION_MODEL, ProviderOptionSelections } from "./model.ts"; import { ModelSelection } from "./orchestration.ts"; import { ProviderInstanceConfig, ProviderInstanceId } from "./providerInstance.ts"; +import { TaskModelHint } from "./taskDelegation.ts"; + +// ── Task delegation routing ──────────────────────────────────── + +/** + * A single routing rule: when a delegated sub-task matches `when`, route it to + * the model in `use`. Rules are evaluated in order; the first match wins. + */ +export const TaskRoutingRule = Schema.Struct({ + when: Schema.Struct({ + modelHint: Schema.optional(TaskModelHint), + /** Case-insensitive substring matched against the sub-task label. */ + labelMatches: Schema.optional(TrimmedNonEmptyString), + }), + use: ModelSelection, +}); +export type TaskRoutingRule = typeof TaskRoutingRule.Type; + +export const TaskRoutingSettings = Schema.Struct({ + rules: Schema.Array(TaskRoutingRule).pipe(Schema.withDecodingDefault(Effect.succeed([]))), + /** Fallback model when no rule matches and the sub-task has no explicit selection. */ + default: Schema.optional(ModelSelection), +}); +export type TaskRoutingSettings = typeof TaskRoutingSettings.Type; // ── Client Settings (local-only) ─────────────────────────────── @@ -409,6 +433,10 @@ export const ServerSettings = Schema.Struct({ Schema.withDecodingDefault(Effect.succeed({})), ), observability: ObservabilitySettings.pipe(Schema.withDecodingDefault(Effect.succeed({}))), + // Rules that route delegated sub-tasks to models. Empty by default; a + // sub-task with no match and no explicit selection falls back to the lead + // thread's model. See TaskModelRouter on the server. + taskRouting: TaskRoutingSettings.pipe(Schema.withDecodingDefault(Effect.succeed({}))), }); export type ServerSettings = typeof ServerSettings.Type; diff --git a/packages/contracts/src/taskDelegation.ts b/packages/contracts/src/taskDelegation.ts new file mode 100644 index 00000000000..7f79ccff5b2 --- /dev/null +++ b/packages/contracts/src/taskDelegation.ts @@ -0,0 +1,70 @@ +import * as Schema from "effect/Schema"; + +import { ThreadId, TrimmedNonEmptyString } from "./baseSchemas.ts"; +import { ModelSelection } from "./orchestration.ts"; + +/** + * Coarse hint a lead agent can attach to a delegated sub-task so the server-side + * router can pick an appropriate model when no explicit selection is supplied. + */ +export const TaskModelHint = Schema.Literals(["cheap", "balanced", "strong"]); +export type TaskModelHint = typeof TaskModelHint.Type; + +/** A single sub-task a lead agent asks the server to run on its behalf. */ +export const DelegateTaskInput = Schema.Struct({ + /** Short human-readable label, surfaced in results and (later) UI grouping. */ + label: Schema.optional(TrimmedNonEmptyString), + /** The instruction handed to the child agent as its user message. */ + prompt: TrimmedNonEmptyString, + /** Explicit model override; wins over rules and hints when present. */ + modelSelection: Schema.optional(ModelSelection), + /** Coarse routing hint used when no explicit `modelSelection` is given. */ + modelHint: Schema.optional(TaskModelHint), +}); +export type DelegateTaskInput = typeof DelegateTaskInput.Type; + +export const DelegateTasksRequest = Schema.Struct({ + tasks: Schema.Array(DelegateTaskInput), + /** Optional cap on how many sub-tasks run at once. Server clamps to a safe max. */ + maxConcurrency: Schema.optional(Schema.Number), +}); +export type DelegateTasksRequest = typeof DelegateTasksRequest.Type; + +export const DelegateTaskResultStatus = Schema.Literals(["running", "completed", "error"]); +export type DelegateTaskResultStatus = typeof DelegateTaskResultStatus.Type; + +export const DelegateTaskResult = Schema.Struct({ + label: Schema.optional(TrimmedNonEmptyString), + /** The child thread that ran this sub-task. */ + threadId: ThreadId, + /** The model the sub-task was routed to (instanceId/model), for transparency. */ + instanceId: Schema.optional(TrimmedNonEmptyString), + model: Schema.optional(TrimmedNonEmptyString), + status: DelegateTaskResultStatus, + /** Final assistant message text on success. */ + message: Schema.optional(Schema.String), + /** Failure detail on error. */ + error: Schema.optional(Schema.String), +}); +export type DelegateTaskResult = typeof DelegateTaskResult.Type; + +export const DelegateTasksResult = Schema.Struct({ + results: Schema.Array(DelegateTaskResult), +}); +export type DelegateTasksResult = typeof DelegateTasksResult.Type; + +/** + * Request to collect results for sub-tasks that were still running when an + * earlier `delegate_tasks`/`collect_delegated_tasks` call returned. Omit + * `threadIds` to collect every still-running sub-task spawned from the calling + * thread. + */ +export const CollectDelegatedTasksRequest = Schema.Struct({ + threadIds: Schema.optional(Schema.Array(ThreadId)), +}); +export type CollectDelegatedTasksRequest = typeof CollectDelegatedTasksRequest.Type; + +/** Hard ceiling on sub-tasks per `delegate_tasks` call. */ +export const MAX_DELEGATED_TASKS = 8; +/** Default and ceiling for concurrent sub-task execution. */ +export const DEFAULT_DELEGATION_CONCURRENCY = 4; diff --git a/scripts/build-desktop-artifact.test.ts b/scripts/build-desktop-artifact.test.ts index 99aea602e8c..afa6863d4c3 100644 --- a/scripts/build-desktop-artifact.test.ts +++ b/scripts/build-desktop-artifact.test.ts @@ -83,7 +83,7 @@ it.layer(NodeServices.layer)("build-desktop-artifact", (it) => { }); it("switches desktop packaging product names to nightly for nightly builds", () => { - assert.equal(resolveDesktopProductName("0.0.17"), "T3 Code (Alpha)"); + assert.equal(resolveDesktopProductName("0.0.17"), "T3 Code Baha"); assert.equal(resolveDesktopProductName("0.0.17-nightly.20260413.42"), "T3 Code (Nightly)"); }); @@ -276,7 +276,7 @@ it.layer(NodeServices.layer)("build-desktop-artifact", (it) => { }); assert.deepStrictEqual(configuration, { - appId: "com.t3tools.t3code", + appId: "com.t3tools.t3code.baha", teamId: "ABC1234567", rpDomains: ["example.clerk.accounts.dev"], provisioningProfilePath: "/tmp/t3code.provisionprofile", @@ -296,7 +296,7 @@ it.layer(NodeServices.layer)("build-desktop-artifact", (it) => { "clerk.example.com", "example.clerk.accounts.dev", ]); - assert.include(entitlements, "ABC1234567.com.t3tools.t3code"); + assert.include(entitlements, "ABC1234567.com.t3tools.t3code.baha"); assert.include(entitlements, "webcredentials:clerk.example.com"); assert.include(entitlements, "webcredentials:example.clerk.accounts.dev"); assert.include(entitlements, "com.apple.security.cs.allow-jit"); @@ -391,7 +391,7 @@ it.layer(NodeServices.layer)("build-desktop-artifact", (it) => { }); const mac = config.mac as Record; - assert.equal(config.appId, "com.t3tools.t3code"); + assert.equal(config.appId, "com.t3tools.t3code.baha"); assert.equal(mac.entitlements, "/tmp/entitlements.mac.plist"); assert.equal(mac.provisioningProfile, "/tmp/t3code.provisionprofile"); assert.deepStrictEqual(mac.protocols, [ diff --git a/scripts/build-desktop-artifact.ts b/scripts/build-desktop-artifact.ts index 5a6cbfb8be3..1bea0abd83a 100644 --- a/scripts/build-desktop-artifact.ts +++ b/scripts/build-desktop-artifact.ts @@ -30,7 +30,7 @@ import { Command, Flag } from "effect/unstable/cli"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; const LINUX_ICON_SIZES = [16, 22, 24, 32, 48, 64, 128, 256, 512] as const; -const DESKTOP_APP_ID = "com.t3tools.t3code"; +const DESKTOP_APP_ID = "com.t3tools.t3code.baha"; const APPLE_TEAM_ID_PATTERN = /^[A-Z0-9]{10}$/u; const BuildPlatform = Schema.Literals(["mac", "linux", "win"]);