Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,136 changes: 1,136 additions & 0 deletions apps/server/src/legacyStateImport.ts

Large diffs are not rendered by default.

15 changes: 0 additions & 15 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
type RuntimeMode,
type ServerConfigShape,
} from "./config";
import { migrateLegacyHomeIfNeeded } from "./homeMigration";
import { fixPath, resolveBaseDir } from "./os-jank";
import { Open } from "./open";
import * as SqlitePersistence from "./persistence/Layers/Sqlite";
Expand Down Expand Up @@ -155,20 +154,6 @@ const ServerConfigLive = (input: CliInput) =>

const devUrl = Option.getOrElse(input.devUrl, () => env.devUrl);
const baseDir = yield* resolveBaseDir(Option.getOrUndefined(input.t3Home) ?? env.t3Home);
// Import legacy ~/.t3 state before runtime paths are derived under ~/.dpcode.
yield* migrateLegacyHomeIfNeeded({
baseDir,
homeDir: OS.homedir(),
devUrl,
}).pipe(
Effect.mapError(
(cause) =>
new StartupError({
message: "Failed to migrate legacy T3 home directory",
cause,
}),
),
);
const derivedPaths = yield* deriveServerPaths(baseDir, devUrl);
const noBrowser = resolveBooleanFlag(input.noBrowser, env.noBrowser ?? mode === "desktop");
const authToken = Option.getOrUndefined(input.authToken) ?? env.authToken;
Expand Down
202 changes: 149 additions & 53 deletions apps/server/src/orchestration/Layers/ProjectionPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {
ApprovalRequestId,
type ChatAttachment,
type OrchestrationEvent,
type ThreadId,
type TurnId,
} from "@t3tools/contracts";
import * as NodeServices from "@effect/platform-node/NodeServices";
import { Effect, FileSystem, Layer, Option, Path, Stream } from "effect";
Expand Down Expand Up @@ -131,6 +133,67 @@ function extractActivityRequestId(payload: unknown): ApprovalRequestId | null {
return typeof requestId === "string" ? ApprovalRequestId.makeUnsafe(requestId) : null;
}

const applyPendingApprovalActivityProjection = Effect.fn(function* (input: {
readonly projectionPendingApprovalRepository: ProjectionPendingApprovalRepositoryShape;
readonly threadId: ThreadId;
readonly activity: {
readonly turnId: TurnId | null;
readonly kind: string;
readonly payload: unknown;
readonly createdAt: string;
};
readonly fallbackRequestId?: ApprovalRequestId | null;
}) {
const requestId =
extractActivityRequestId(input.activity.payload) ?? input.fallbackRequestId ?? null;
if (requestId === null) {
return;
}

const existingRow = yield* input.projectionPendingApprovalRepository.getByRequestId({
requestId,
});
if (input.activity.kind === "approval.resolved") {
const resolvedDecisionRaw =
typeof input.activity.payload === "object" &&
input.activity.payload !== null &&
"decision" in input.activity.payload
? (input.activity.payload as { decision?: unknown }).decision
: null;
const resolvedDecision =
resolvedDecisionRaw === "accept" ||
resolvedDecisionRaw === "acceptForSession" ||
resolvedDecisionRaw === "decline" ||
resolvedDecisionRaw === "cancel"
? resolvedDecisionRaw
: null;
yield* input.projectionPendingApprovalRepository.upsert({
requestId,
threadId: Option.isSome(existingRow) ? existingRow.value.threadId : input.threadId,
turnId: Option.isSome(existingRow) ? existingRow.value.turnId : input.activity.turnId,
status: "resolved",
decision: resolvedDecision,
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: input.activity.createdAt,
resolvedAt: input.activity.createdAt,
});
return;
}
if (Option.isSome(existingRow) && existingRow.value.status === "resolved") {
return;
}
yield* input.projectionPendingApprovalRepository.upsert({
requestId,
threadId: input.threadId,
turnId: input.activity.turnId,
status: "pending",
decision: null,
createdAt: Option.isSome(existingRow) ? existingRow.value.createdAt : input.activity.createdAt,
resolvedAt: null,
});
});

// Recompute the denormalized sidebar shell summary after per-thread timeline changes.
const withRefreshedThreadShellSummary = Effect.fn(function* (input: {
readonly thread: ProjectionThread;
Expand Down Expand Up @@ -513,6 +576,9 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
}
yield* projectionThreadRepository.upsert({
...existingRow.value,
...(event.payload.projectId !== undefined
? { projectId: event.payload.projectId }
: {}),
...(event.payload.title !== undefined ? { title: event.payload.title } : {}),
...(event.payload.modelSelection !== undefined
? { modelSelection: event.payload.modelSelection }
Expand Down Expand Up @@ -631,8 +697,11 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
}

case "thread.message-sent":
case "thread.messages-imported":
case "thread.proposed-plan-upserted":
case "thread.proposed-plans-imported":
case "thread.activity-appended":
case "thread.activities-imported":
case "thread.approval-response-requested":
case "thread.reverted": {
const existingRow = yield* projectionThreadRepository.getById({
Expand Down Expand Up @@ -751,6 +820,31 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
return;
}

case "thread.messages-imported":
yield* Effect.forEach(
event.payload.messages,
(message) =>
projectionThreadMessageRepository.upsert({
messageId: message.messageId,
threadId: event.payload.threadId,
turnId: message.turnId ?? null,
role: message.role,
text: message.text,
...(message.attachments !== undefined ? { attachments: message.attachments } : {}),
...(message.skills !== undefined ? { skills: message.skills } : {}),
...(message.mentions !== undefined ? { mentions: message.mentions } : {}),
...(message.dispatchMode !== undefined
? { dispatchMode: message.dispatchMode }
: {}),
isStreaming: message.streaming ?? false,
source: message.source ?? "native",
createdAt: message.createdAt,
updatedAt: message.updatedAt,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
return;

case "thread.reverted": {
const existingRows = yield* projectionThreadMessageRepository.listByThreadId({
threadId: event.payload.threadId,
Expand Down Expand Up @@ -808,6 +902,24 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
});
return;

case "thread.proposed-plans-imported":
yield* Effect.forEach(
event.payload.proposedPlans,
(proposedPlan) =>
projectionThreadProposedPlanRepository.upsert({
planId: proposedPlan.id,
threadId: event.payload.threadId,
turnId: proposedPlan.turnId,
planMarkdown: proposedPlan.planMarkdown,
implementedAt: proposedPlan.implementedAt,
implementationThreadId: proposedPlan.implementationThreadId,
createdAt: proposedPlan.createdAt,
updatedAt: proposedPlan.updatedAt,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
return;

case "thread.reverted": {
const existingRows = yield* projectionThreadProposedPlanRepository.listByThreadId({
threadId: event.payload.threadId,
Expand Down Expand Up @@ -864,6 +976,25 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
});
return;

case "thread.activities-imported":
yield* Effect.forEach(
event.payload.activities,
(activity) =>
projectionThreadActivityRepository.upsert({
activityId: activity.id,
threadId: event.payload.threadId,
turnId: activity.turnId,
tone: activity.tone,
kind: activity.kind,
summary: activity.summary,
payload: activity.payload,
...(activity.sequence !== undefined ? { sequence: activity.sequence } : {}),
createdAt: activity.createdAt,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
return;

case "thread.reverted": {
const existingRows = yield* projectionThreadActivityRepository.listByThreadId({
threadId: event.payload.threadId,
Expand Down Expand Up @@ -1225,64 +1356,29 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () {
Effect.gen(function* () {
switch (event.type) {
case "thread.activity-appended": {
const requestId =
extractActivityRequestId(event.payload.activity.payload) ??
event.metadata.requestId ??
null;
if (requestId === null) {
return;
}
const existingRow = yield* projectionPendingApprovalRepository.getByRequestId({
requestId,
});
if (event.payload.activity.kind === "approval.resolved") {
const resolvedDecisionRaw =
typeof event.payload.activity.payload === "object" &&
event.payload.activity.payload !== null &&
"decision" in event.payload.activity.payload
? (event.payload.activity.payload as { decision?: unknown }).decision
: null;
const resolvedDecision =
resolvedDecisionRaw === "accept" ||
resolvedDecisionRaw === "acceptForSession" ||
resolvedDecisionRaw === "decline" ||
resolvedDecisionRaw === "cancel"
? resolvedDecisionRaw
: null;
yield* projectionPendingApprovalRepository.upsert({
requestId,
threadId: Option.isSome(existingRow)
? existingRow.value.threadId
: event.payload.threadId,
turnId: Option.isSome(existingRow)
? existingRow.value.turnId
: event.payload.activity.turnId,
status: "resolved",
decision: resolvedDecision,
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: event.payload.activity.createdAt,
resolvedAt: event.payload.activity.createdAt,
});
return;
}
if (Option.isSome(existingRow) && existingRow.value.status === "resolved") {
return;
}
yield* projectionPendingApprovalRepository.upsert({
requestId,
yield* applyPendingApprovalActivityProjection({
projectionPendingApprovalRepository,
threadId: event.payload.threadId,
turnId: event.payload.activity.turnId,
status: "pending",
decision: null,
createdAt: Option.isSome(existingRow)
? existingRow.value.createdAt
: event.payload.activity.createdAt,
resolvedAt: null,
activity: event.payload.activity,
fallbackRequestId: event.metadata.requestId ?? null,
});
return;
}

case "thread.activities-imported": {
yield* Effect.forEach(
event.payload.activities,
(activity) =>
applyPendingApprovalActivityProjection({
projectionPendingApprovalRepository,
threadId: event.payload.threadId,
activity,
}),
{ concurrency: 1 },
).pipe(Effect.asVoid);
return;
}

case "thread.approval-response-requested": {
const existingRow = yield* projectionPendingApprovalRepository.getByRequestId({
requestId: event.payload.requestId,
Expand Down
6 changes: 6 additions & 0 deletions apps/server/src/orchestration/Schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
ThreadMessageSentPayload as ContractsThreadMessageSentPayloadSchema,
ThreadProposedPlanUpsertedPayload as ContractsThreadProposedPlanUpsertedPayloadSchema,
ThreadSessionSetPayload as ContractsThreadSessionSetPayloadSchema,
ThreadMessagesImportedPayload as ContractsThreadMessagesImportedPayloadSchema,
ThreadProposedPlansImportedPayload as ContractsThreadProposedPlansImportedPayloadSchema,
ThreadActivitiesImportedPayload as ContractsThreadActivitiesImportedPayloadSchema,
ThreadTurnDiffCompletedPayload as ContractsThreadTurnDiffCompletedPayloadSchema,
ThreadRevertedPayload as ContractsThreadRevertedPayloadSchema,
ThreadActivityAppendedPayload as ContractsThreadActivityAppendedPayloadSchema,
Expand Down Expand Up @@ -41,6 +44,9 @@ export const ThreadSessionSetPayload = ContractsThreadSessionSetPayloadSchema;
export const ThreadTurnDiffCompletedPayload = ContractsThreadTurnDiffCompletedPayloadSchema;
export const ThreadRevertedPayload = ContractsThreadRevertedPayloadSchema;
export const ThreadActivityAppendedPayload = ContractsThreadActivityAppendedPayloadSchema;
export const ThreadMessagesImportedPayload = ContractsThreadMessagesImportedPayloadSchema;
export const ThreadProposedPlansImportedPayload = ContractsThreadProposedPlansImportedPayloadSchema;
export const ThreadActivitiesImportedPayload = ContractsThreadActivitiesImportedPayloadSchema;

export const ThreadTurnStartRequestedPayload = ContractsThreadTurnStartRequestedPayloadSchema;
export const ThreadTurnInterruptRequestedPayload =
Expand Down
Loading