diff --git a/apps/code/src/main/services/cloud-task/service.test.ts b/apps/code/src/main/services/cloud-task/service.test.ts index d1032cd15..fc9920a39 100644 --- a/apps/code/src/main/services/cloud-task/service.test.ts +++ b/apps/code/src/main/services/cloud-task/service.test.ts @@ -446,8 +446,9 @@ describe("CloudTaskService", () => { ); expect(mockStreamFetch.mock.calls.length).toBe(6); - // 2 bootstrap calls + 6 handleStreamCompletion calls (one per stream error) - expect(mockNetFetch).toHaveBeenCalledTimes(8); + // 2 bootstrap calls + 1 post-bootstrap status verification + 6 + // handleStreamCompletion calls (one per stream error) + expect(mockNetFetch).toHaveBeenCalledTimes(9); expect(updates).toContainEqual({ taskId: "task-1", runId: "run-1", diff --git a/apps/code/src/main/services/cloud-task/service.ts b/apps/code/src/main/services/cloud-task/service.ts index 01b9f2326..42ae9a384 100644 --- a/apps/code/src/main/services/cloud-task/service.ts +++ b/apps/code/src/main/services/cloud-task/service.ts @@ -548,6 +548,33 @@ export class CloudTaskService extends TypedEventEmitter { watcher.needsPostBootstrapReconnect = false; this.scheduleReconnect(key); } + + void this.verifyPostBootstrapStatus(key); + } + + private async verifyPostBootstrapStatus(key: string): Promise { + const watcher = this.watchers.get(key); + if (!watcher) return; + if (isTerminalStatus(watcher.lastStatus)) return; + + const run = await this.fetchTaskRun(watcher); + const currentWatcher = this.watchers.get(key); + if (!currentWatcher || currentWatcher !== watcher) return; + if (!run) return; + + if (!this.applyTaskRunState(watcher, run)) return; + if (isTerminalStatus(watcher.lastStatus)) return; + + this.emit(CloudTaskEvent.Update, { + taskId: watcher.taskId, + runId: watcher.runId, + kind: "status", + status: watcher.lastStatus ?? undefined, + stage: watcher.lastStage, + output: watcher.lastOutput, + errorMessage: watcher.lastErrorMessage, + branch: watcher.lastBranch, + }); } private async connectSse( diff --git a/apps/code/src/main/services/handoff/handoff-saga.test.ts b/apps/code/src/main/services/handoff/handoff-saga.test.ts index c7221e874..8a8067859 100644 --- a/apps/code/src/main/services/handoff/handoff-saga.test.ts +++ b/apps/code/src/main/services/handoff/handoff-saga.test.ts @@ -33,19 +33,6 @@ function createInput( }; } -function createSnapshot( - overrides: Partial = {}, -): AgentTypes.TreeSnapshotEvent { - return { - treeHash: "abc123", - baseCommit: "def456", - archiveUrl: "https://s3.example.com/archive.tar.gz", - changes: [{ path: "test.txt", status: "A" }], - timestamp: "2026-04-07T00:00:00Z", - ...overrides, - }; -} - function createCheckpoint( overrides: Partial = {}, ): AgentTypes.GitCheckpointEvent { @@ -75,7 +62,6 @@ function createDeps(overrides: Partial = {}): HandoffSagaDeps { }), updateTaskRun: vi.fn().mockResolvedValue({}), }), - applyTreeSnapshot: vi.fn().mockResolvedValue(undefined), applyGitCheckpoint: vi.fn().mockResolvedValue(undefined), updateWorkspaceMode: vi.fn(), reconnectSession: vi.fn().mockResolvedValue({ @@ -96,7 +82,6 @@ function createResumeState( ): AgentResume.ResumeState { return { conversation: [], - latestSnapshot: null, latestGitCheckpoint: null, interrupted: false, logEntryCount: 0, @@ -130,14 +115,14 @@ describe("HandoffSaga", () => { mockFormatConversation.mockReturnValue("conversation summary"); }); - it("completes happy path with snapshot", async () => { - const snapshot = createSnapshot(); + it("completes happy path with checkpoint", async () => { + const checkpoint = createCheckpoint(); const { result } = await runSaga({ resumeState: { conversation: [ { role: "user", content: [{ type: "text", text: "hello" }] }, ], - latestSnapshot: snapshot, + latestGitCheckpoint: checkpoint, logEntryCount: 10, }, }); @@ -145,7 +130,7 @@ describe("HandoffSaga", () => { expect(result.success).toBe(true); if (!result.success) return; expect(result.data.sessionId).toBe("session-1"); - expect(result.data.snapshotApplied).toBe(true); + expect(result.data.checkpointApplied).toBe(true); expect(result.data.conversationTurns).toBe(1); }); @@ -165,27 +150,15 @@ describe("HandoffSaga", () => { expect(closeOrder).toBeLessThan(fetchOrder); }); - it("skips snapshot apply when no archiveUrl", async () => { + it("skips checkpoint apply when no checkpoint is present", async () => { const { deps, result } = await runSaga({ - resumeState: { - latestSnapshot: createSnapshot({ archiveUrl: undefined }), - logEntryCount: 5, - }, + resumeState: { logEntryCount: 5 }, }); expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.snapshotApplied).toBe(false); - expect(deps.applyTreeSnapshot).not.toHaveBeenCalled(); - }); - - it("skips snapshot apply when no snapshot at all", async () => { - const { deps, result } = await runSaga(); - - expect(result.success).toBe(true); - if (!result.success) return; - expect(result.data.snapshotApplied).toBe(false); - expect(deps.applyTreeSnapshot).not.toHaveBeenCalled(); + expect(result.data.checkpointApplied).toBe(false); + expect(deps.applyGitCheckpoint).not.toHaveBeenCalled(); }); it("seeds local logs when cloudLogUrl is present", async () => { @@ -232,16 +205,16 @@ describe("HandoffSaga", () => { ); }); - it("context mentions files restored when snapshot applied", async () => { + it("context mentions files restored when checkpoint applied", async () => { const { deps } = await runSaga({ resumeState: { - latestSnapshot: createSnapshot(), + latestGitCheckpoint: createCheckpoint(), }, }); expect(deps.setPendingContext).toHaveBeenCalledWith( "run-1", - expect.stringContaining("fully restored"), + expect.stringContaining("restored from the cloud session checkpoint"), ); }); @@ -262,14 +235,12 @@ describe("HandoffSaga", () => { const { deps } = await runSaga({ resumeState: { latestGitCheckpoint: createCheckpoint(), - latestSnapshot: createSnapshot(), }, }); expect(getProgressSteps(deps)).toEqual([ "fetching_logs", "applying_git_checkpoint", - "applying_snapshot", "spawning_agent", "complete", ]); @@ -318,11 +289,10 @@ describe("HandoffSaga", () => { }); }); - it("applies git checkpoint before restoring the file snapshot", async () => { + it("applies git checkpoint with local git state during handoff", async () => { const { deps, result } = await runSaga({ input: { localGitState: DEFAULT_LOCAL_GIT_STATE }, resumeState: { - latestSnapshot: createSnapshot(), latestGitCheckpoint: createCheckpoint(), }, }); @@ -338,6 +308,5 @@ describe("HandoffSaga", () => { expect.any(Object), DEFAULT_LOCAL_GIT_STATE, ); - expect(deps.applyTreeSnapshot).toHaveBeenCalledTimes(1); }); }); diff --git a/apps/code/src/main/services/handoff/handoff-saga.ts b/apps/code/src/main/services/handoff/handoff-saga.ts index 0c783554f..9b00c3b39 100644 --- a/apps/code/src/main/services/handoff/handoff-saga.ts +++ b/apps/code/src/main/services/handoff/handoff-saga.ts @@ -13,18 +13,11 @@ export type HandoffSagaInput = HandoffExecuteInput; export interface HandoffSagaOutput { sessionId: string; - snapshotApplied: boolean; + checkpointApplied: boolean; conversationTurns: number; } export interface HandoffSagaDeps extends HandoffBaseDeps { - applyTreeSnapshot( - snapshot: AgentTypes.TreeSnapshotEvent, - repoPath: string, - taskId: string, - runId: string, - apiClient: PostHogAPIClient, - ): Promise; applyGitCheckpoint( checkpoint: AgentTypes.GitCheckpointEvent, repoPath: string, @@ -102,7 +95,7 @@ export class HandoffSaga extends Saga { }, ); - let filesRestored = false; + let checkpointApplied = false; const checkpoint = resumeState.latestGitCheckpoint; if (checkpoint) { this.deps.onProgress( @@ -121,29 +114,7 @@ export class HandoffSaga extends Saga { apiClient, input.localGitState, ); - }, - rollback: async () => {}, - }); - } - - const snapshot = resumeState.latestSnapshot; - if (snapshot?.archiveUrl) { - this.deps.onProgress( - "applying_snapshot", - "Applying cloud file state locally...", - ); - - await this.step({ - name: "apply_snapshot", - execute: async () => { - await this.deps.applyTreeSnapshot( - snapshot, - repoPath, - taskId, - runId, - apiClient, - ); - filesRestored = true; + checkpointApplied = true; }, rollback: async () => {}, }); @@ -198,7 +169,7 @@ export class HandoffSaga extends Saga { await this.readOnlyStep("set_context", async () => { const context = this.buildHandoffContext( resumeState.conversation, - filesRestored, + checkpointApplied, ); this.deps.setPendingContext(runId, context); }); @@ -207,20 +178,20 @@ export class HandoffSaga extends Saga { return { sessionId: agentSessionId, - snapshotApplied: filesRestored, + checkpointApplied, conversationTurns: resumeState.conversation.length, }; } private buildHandoffContext( conversation: AgentResume.ConversationTurn[], - snapshotApplied: boolean, + checkpointApplied: boolean, ): string { const conversationSummary = formatConversationForResume(conversation); - const fileStatus = snapshotApplied - ? "The workspace files have been fully restored from the cloud session." - : "The workspace files from the cloud session could not be restored. You are working with the local file state."; + const fileStatus = checkpointApplied + ? "The workspace git state and files have been restored from the cloud session checkpoint." + : "The workspace from the cloud session could not be restored from a checkpoint. You are working with the local file state."; return ( `You are resuming a previous conversation that was running in a cloud sandbox. ` + diff --git a/apps/code/src/main/services/handoff/handoff-to-cloud-saga.test.ts b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.test.ts new file mode 100644 index 000000000..bb7a57099 --- /dev/null +++ b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.test.ts @@ -0,0 +1,88 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + HandoffToCloudSaga, + type HandoffToCloudSagaDeps, +} from "./handoff-to-cloud-saga"; + +function createDeps( + overrides: Partial = {}, +): HandoffToCloudSagaDeps { + return { + captureGitCheckpoint: vi.fn().mockResolvedValue({ + checkpointId: "checkpoint-1", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-1", + }), + persistCheckpointToLog: vi.fn().mockResolvedValue(undefined), + countLocalLogEntries: vi.fn().mockReturnValue(7), + resumeRunInCloud: vi.fn().mockResolvedValue(undefined), + killSession: vi.fn().mockResolvedValue(undefined), + updateWorkspaceMode: vi.fn(), + onProgress: vi.fn(), + ...overrides, + } as unknown as HandoffToCloudSagaDeps; +} + +describe("HandoffToCloudSaga", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("persists the fresh checkpoint, starts cloud, then kills the local session", async () => { + const deps = createDeps(); + const order: string[] = []; + + vi.mocked(deps.persistCheckpointToLog).mockImplementation(async () => { + order.push("checkpoint"); + }); + vi.mocked(deps.resumeRunInCloud).mockImplementation(async () => { + order.push("resume"); + }); + vi.mocked(deps.killSession).mockImplementation(async () => { + order.push("kill"); + }); + + const saga = new HandoffToCloudSaga(deps); + const result = await saga.run({ + taskId: "task-1", + runId: "run-1", + repoPath: "/repo/path", + apiHost: "https://us.posthog.com", + teamId: 1, + localGitState: { + head: "head-1", + branch: "main", + upstreamHead: "upstream-head-1", + upstreamRemote: "origin", + upstreamMergeRef: "refs/heads/main", + }, + }); + + expect(result.success).toBe(true); + expect(order).toEqual(["checkpoint", "resume", "kill"]); + expect(deps.countLocalLogEntries).toHaveBeenCalledWith("run-1"); + if (result.success) { + expect(result.data.logEntryCount).toBe(7); + expect(result.data.checkpointCaptured).toBe(true); + } + }); + + it("reports logEntryCount of 0 when no local cache exists", async () => { + const deps = createDeps({ + countLocalLogEntries: vi.fn().mockReturnValue(0), + }); + + const saga = new HandoffToCloudSaga(deps); + const result = await saga.run({ + taskId: "task-1", + runId: "run-1", + repoPath: "/repo/path", + apiHost: "https://us.posthog.com", + teamId: 1, + }); + + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.logEntryCount).toBe(0); + } + }); +}); diff --git a/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts index 6effd7bd2..7201555a1 100644 --- a/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts +++ b/apps/code/src/main/services/handoff/handoff-to-cloud-saga.ts @@ -6,20 +6,17 @@ export type HandoffToCloudSagaInput = HandoffToCloudExecuteInput; export interface HandoffToCloudSagaOutput { checkpointCaptured: boolean; - snapshotCaptured: boolean; - flushedLogEntryCount: number; + logEntryCount: number; } export interface HandoffToCloudSagaDeps extends HandoffBaseDeps { captureGitCheckpoint( localGitState?: AgentTypes.HandoffLocalGitState, ): Promise; - captureTreeSnapshot(): Promise; persistCheckpointToLog( checkpoint: AgentTypes.GitCheckpointEvent, ): Promise; - persistSnapshotToLog(snapshot: AgentTypes.TreeSnapshotEvent): Promise; - flushLocalLogs(): Promise; + countLocalLogEntries(runId: string): number; resumeRunInCloud(): Promise; } @@ -41,7 +38,6 @@ export class HandoffToCloudSaga extends Saga< const { taskId, runId } = input; let checkpointCaptured = false; - let snapshotCaptured = false; this.deps.onProgress( "capturing_checkpoint", @@ -52,35 +48,13 @@ export class HandoffToCloudSaga extends Saga< this.deps.captureGitCheckpoint(input.localGitState), ); - let persistedNotificationCount = 0; - if (checkpoint) { await this.readOnlyStep("persist_checkpoint_to_log", () => this.deps.persistCheckpointToLog(checkpoint), ); checkpointCaptured = true; - persistedNotificationCount++; - } - - this.deps.onProgress("capturing_snapshot", "Capturing local file state..."); - - const snapshot = await this.readOnlyStep("capture_tree_snapshot", () => - this.deps.captureTreeSnapshot(), - ); - - if (snapshot) { - await this.readOnlyStep("persist_snapshot_to_log", () => - this.deps.persistSnapshotToLog(snapshot), - ); - snapshotCaptured = true; - persistedNotificationCount++; } - const localLogLineCount = await this.readOnlyStep("flush_local_logs", () => - this.deps.flushLocalLogs(), - ); - const flushedLogEntryCount = localLogLineCount + persistedNotificationCount; - this.deps.onProgress("starting_cloud_run", "Starting cloud sandbox..."); await this.step({ @@ -95,6 +69,8 @@ export class HandoffToCloudSaga extends Saga< this.deps.killSession(runId), ); + const logEntryCount = this.deps.countLocalLogEntries(runId); + await this.step({ name: "update_workspace", execute: async () => { @@ -107,6 +83,6 @@ export class HandoffToCloudSaga extends Saga< this.deps.onProgress("complete", "Handoff to cloud complete"); - return { checkpointCaptured, snapshotCaptured, flushedLogEntryCount }; + return { checkpointCaptured, logEntryCount }; } } diff --git a/apps/code/src/main/services/handoff/schemas.ts b/apps/code/src/main/services/handoff/schemas.ts index e6d76c26d..f9675b6a6 100644 --- a/apps/code/src/main/services/handoff/schemas.ts +++ b/apps/code/src/main/services/handoff/schemas.ts @@ -97,10 +97,8 @@ export type HandoffToCloudExecuteResult = z.infer< export type HandoffStep = | "fetching_logs" | "applying_git_checkpoint" - | "applying_snapshot" | "spawning_agent" | "capturing_checkpoint" - | "capturing_snapshot" | "stopping_agent" | "starting_cloud_run" | "complete" diff --git a/apps/code/src/main/services/handoff/service.test.ts b/apps/code/src/main/services/handoff/service.test.ts index 8ed90242f..98c159c74 100644 --- a/apps/code/src/main/services/handoff/service.test.ts +++ b/apps/code/src/main/services/handoff/service.test.ts @@ -44,12 +44,6 @@ vi.mock("@posthog/agent/posthog-api", () => ({ PostHogAPIClient: vi.fn(), })); -vi.mock("@posthog/agent/tree-tracker", () => ({ - TreeTracker: vi.fn().mockImplementation(() => ({ - applyTreeSnapshot: vi.fn(), - })), -})); - vi.mock("@posthog/agent/handoff-checkpoint", () => ({ HandoffCheckpointTracker: vi.fn().mockImplementation(() => ({ applyFromHandoff: mockApplyFromHandoff, diff --git a/apps/code/src/main/services/handoff/service.ts b/apps/code/src/main/services/handoff/service.ts index ed15efaac..e1f41465a 100644 --- a/apps/code/src/main/services/handoff/service.ts +++ b/apps/code/src/main/services/handoff/service.ts @@ -1,4 +1,10 @@ -import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { + existsSync, + mkdirSync, + readFileSync, + rmSync, + writeFileSync, +} from "node:fs"; import { join } from "node:path"; import { MAIN_TOKENS } from "@main/di/tokens"; import { logger } from "@main/utils/logger"; @@ -6,7 +12,6 @@ import { TypedEventEmitter } from "@main/utils/typed-event-emitter"; import { POSTHOG_NOTIFICATIONS } from "@posthog/agent"; import { HandoffCheckpointTracker } from "@posthog/agent/handoff-checkpoint"; import { PostHogAPIClient } from "@posthog/agent/posthog-api"; -import { TreeTracker } from "@posthog/agent/tree-tracker"; import type * as AgentTypes from "@posthog/agent/types"; import { type GitHandoffBranchDivergence, @@ -99,25 +104,6 @@ export class HandoffService extends TypedEventEmitter { createApiClient: (apiHost, teamId) => this.createApiClient(apiHost, teamId), - applyTreeSnapshot: async ( - snapshot: AgentTypes.TreeSnapshotEvent, - repoPath: string, - taskId: string, - runId: string, - apiClient: PostHogAPIClient, - ) => { - const tracker = new TreeTracker({ - repositoryPath: repoPath, - taskId, - runId, - apiClient, - }); - await tracker.applyTreeSnapshot({ - ...snapshot, - baseCommit: null, - }); - }, - applyGitCheckpoint: async ( checkpoint: AgentTypes.GitCheckpointEvent, repoPath: string, @@ -258,13 +244,6 @@ export class HandoffService extends TypedEventEmitter { apiClient, }); - const treeTracker = new TreeTracker({ - repositoryPath: repoPath, - taskId, - runId, - apiClient, - }); - const appendNotification = async ( method: string, params: Record, @@ -288,67 +267,24 @@ export class HandoffService extends TypedEventEmitter { return { ...checkpoint, device: { type: "local" as const } }; }, - captureTreeSnapshot: async () => { - const snapshot = await treeTracker.captureTree({}); - if (!snapshot) return null; - return { ...snapshot, device: { type: "local" as const } }; - }, - persistCheckpointToLog: (checkpoint) => appendNotification( POSTHOG_NOTIFICATIONS.GIT_CHECKPOINT, checkpoint as unknown as Record, ), - persistSnapshotToLog: (snapshot) => - appendNotification( - POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, - snapshot as unknown as Record, - ), - - flushLocalLogs: async () => { + countLocalLogEntries: (taskRunId) => { const logPath = join( app.getPath("home"), ".posthog-code", "sessions", - runId, + taskRunId, "logs.ndjson", ); if (!existsSync(logPath)) return 0; - - const lines = readFileSync(logPath, "utf-8") + return readFileSync(logPath, "utf-8") .split("\n") - .filter((l) => l.trim()); - if (lines.length === 0) return 0; - - const boundaryIndex = lines.findIndex((l) => { - try { - return JSON.parse(l).type === "seed_boundary"; - } catch { - return false; - } - }); - const newLines = - boundaryIndex >= 0 ? lines.slice(boundaryIndex + 1) : lines; - - const entries: AgentTypes.StoredEntry[] = []; - for (const line of newLines) { - try { - entries.push(JSON.parse(line)); - } catch { - // skip - } - } - - if (entries.length > 0) { - await apiClient.appendTaskRunLog(taskId, runId, entries); - log.info("Flushed local logs to cloud", { - runId, - entries: entries.length, - }); - } - - return lines.length; + .filter((l) => l.trim()).length; }, resumeRunInCloud: async () => { @@ -388,12 +324,32 @@ export class HandoffService extends TypedEventEmitter { input.localGitState?.branch ?? null, ); + this.deleteLocalLogCache(runId); + return { success: true, - logEntryCount: result.data.flushedLogEntryCount, + logEntryCount: result.data.logEntryCount, }; } + private deleteLocalLogCache(runId: string): void { + const logPath = join( + app.getPath("home"), + ".posthog-code", + "sessions", + runId, + "logs.ndjson", + ); + try { + rmSync(logPath, { force: true }); + } catch (err) { + log.warn("Failed to delete local log cache after cloud handoff", { + runId, + err, + }); + } + } + private async cleanupLocalAfterCloudHandoff( repoPath: string, branchName: string | null, diff --git a/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts b/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts index d4e84466c..1ac6999b2 100644 --- a/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts +++ b/apps/code/src/renderer/features/task-detail/hooks/useCloudRunState.ts @@ -4,7 +4,7 @@ import { extractCloudToolChangedFiles, } from "@features/task-detail/utils/cloudToolChanges"; import { useTasks } from "@features/tasks/hooks/useTasks"; -import type { ChangedFile, Task } from "@shared/types"; +import type { Task } from "@shared/types"; import { useMemo } from "react"; export function useCloudRunState(taskId: string, task: Task) { @@ -32,14 +32,10 @@ export function useCloudRunState(taskId: string, task: Task) { const events = session?.events; const summary = useMemo(() => buildCloudEventSummary(events ?? []), [events]); - const toolCallFiles = useMemo( + const fallbackFiles = useMemo( () => extractCloudToolChangedFiles(summary.toolCalls), [summary], ); - const fallbackFiles: ChangedFile[] = - summary.treeSnapshotFiles.length > 0 - ? summary.treeSnapshotFiles - : toolCallFiles; return { freshTask, diff --git a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts index f4e7f1430..52adafe3d 100644 --- a/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts +++ b/apps/code/src/renderer/features/task-detail/utils/cloudToolChanges.ts @@ -2,8 +2,7 @@ import type { ToolCallContent, ToolCallLocation, } from "@features/sessions/types"; -import { isNotification, POSTHOG_NOTIFICATIONS } from "@posthog/agent"; -import type { ChangedFile, GitFileStatus } from "@shared/types"; +import type { ChangedFile } from "@shared/types"; import { type AcpMessage, isJsonRpcNotification, @@ -114,23 +113,15 @@ function getDiffStats( export interface CloudEventSummary { toolCalls: Map; - treeSnapshotFiles: ChangedFile[]; } -const TREE_SNAPSHOT_STATUS_MAP: Record = { - A: "added", - M: "modified", - D: "deleted", -}; - /** - * Single-pass extraction of tool calls and the last tree snapshot from events. + * Single-pass extraction of tool calls from events. */ export function buildCloudEventSummary( events: AcpMessage[], ): CloudEventSummary { const toolCalls = new Map(); - let treeSnapshotFiles: ChangedFile[] = []; for (const event of events) { const message = event.message; @@ -170,28 +161,10 @@ export function buildCloudEventSummary( const merged = mergeToolCall(toolCalls.get(toolCallId), patch); toolCalls.set(toolCallId, merged); - } else if ( - isNotification(message.method, POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT) - ) { - const params = message.params as - | { - changes?: Array<{ path: string; status: "A" | "M" | "D" }>; - } - | undefined; - const changes = params?.changes; - if (!Array.isArray(changes) || changes.length === 0) continue; - - // Overwrite — we only care about the last snapshot - treeSnapshotFiles = changes - .filter((c) => c.path && c.status in TREE_SNAPSHOT_STATUS_MAP) - .map((c) => ({ - path: c.path, - status: TREE_SNAPSHOT_STATUS_MAP[c.status], - })); } } - return { toolCalls, treeSnapshotFiles }; + return { toolCalls }; } export function extractCloudFileDiff( diff --git a/packages/agent/README.md b/packages/agent/README.md index 24603dae5..0d81a81ef 100644 --- a/packages/agent/README.md +++ b/packages/agent/README.md @@ -59,7 +59,7 @@ The same ACP agent runs in both contexts. The difference is how it's connected: **Local (PostHog Code desktop):** The agent runs in-process. PostHog Code calls `createAcpConnection()` directly — no HTTP server, no JWT. The bidirectional ACP streams connect client ↔ agent within the same process. -**TreeTracker** handles the bridge between these contexts: it captures the git working tree as snapshots (tree hash + file archive) so work can be transferred between cloud and local. This enables the "hand off" flow — start locally, continue in cloud, or vice versa. Tree snapshots use the Saga pattern (`src/sagas/`) for atomic operations with automatic rollback on failure. +**HandoffCheckpointTracker** handles the bridge between these contexts: it captures git checkpoint state plus the object pack/index needed to restore the worktree across cloud and local. This enables the "hand off" flow — start locally, continue in cloud, or vice versa. ### Permission modes @@ -108,7 +108,7 @@ start() ├─ Creates synthetic JwtPayload from CLI config ├─ configureEnvironment() — sets ANTHROPIC_BASE_URL, OPENAI_BASE_URL, etc. │ pointing at the PostHog LLM gateway - ├─ Creates TreeTracker, SessionLogWriter, PostHogAPIClient + ├─ Creates HandoffCheckpointTracker, SessionLogWriter, PostHogAPIClient ├─ createAcpConnection() — sets up ACP streams with log tapping │ ├─ Wraps client streams with a SECOND tap layer (NdJsonTap) @@ -144,9 +144,9 @@ When `POST /command` receives a `user_message`, it doesn't handle it directly The `AgentServer` provides a `requestPermission` callback to the `ClientSideConnection` that always selects the "allow" option. In background mode this is necessary (no human to ask). In interactive mode it currently does the same, with a TODO for future per-tool approval via SSE round-trips. -### Tree state capture +### Checkpoint capture -After every `Write` or `Edit` tool call, the server captures a git tree snapshot via `TreeTracker` and broadcasts it as a `_posthog/tree_snapshot` SSE event. A final snapshot is captured during session cleanup. This is how the client knows what files changed and can restore state for cloud↔local handoff. +After file-mutating tool calls, the server captures a git checkpoint via `HandoffCheckpointTracker` and broadcasts it as a `_posthog/git_checkpoint` SSE event. A final checkpoint is captured during session cleanup. This is how the client restores repo state for cloud↔local handoff. ### CLI @@ -200,7 +200,7 @@ For Codex adapters, `agent.run()` also fetches available models from the PostHog ## Log pipeline and session resume -Logs serve two purposes: real-time observability and session resume. Every ACP message that flows through the tapped streams is persisted, creating a complete record of the conversation — user messages, agent responses, tool calls, tool results, tree snapshots, and metadata events. This record is the single source of truth for resuming a session from any point. +Logs serve two purposes: real-time observability and session resume. Every ACP message that flows through the tapped streams is persisted, creating a complete record of the conversation — user messages, agent responses, tool calls, tool results, git checkpoints, and metadata events. This record is the single source of truth for resuming a session from any point. ### Writing logs @@ -218,10 +218,9 @@ When a session needs to continue (e.g. cloud↔local handoff, or recovering from ```text 1. fetch_task_run → GET /api/.../runs/{runId}/ to find the log_url 2. fetch_logs → Download all StoredNotification entries -3. find_snapshot → Scan backwards for latest _posthog/tree_snapshot -4. apply_snapshot → Download archive from snapshot URL, restore working tree -5. rebuild_conversation → Walk log entries to reconstruct conversation turns -6. find_device → Scan backwards for last device info (local vs cloud) +3. find_git_checkpoint → Scan backwards for latest _posthog/git_checkpoint +4. rebuild_conversation → Walk log entries to reconstruct conversation turns +5. find_device → Scan backwards for last device info (local vs cloud) ``` The conversation rebuild (`rebuildConversation`) walks the log entries and reassembles turns from ACP `session/update` notifications: @@ -231,9 +230,7 @@ The conversation rebuild (`rebuildConversation`) walks the log entries and reass - `tool_call` / `tool_call_update` → track tool calls with their inputs - `tool_result` → match results back to tool calls by `toolCallId` -The result is a `ResumeState` containing the conversation history as `ConversationTurn[]`, the latest tree snapshot, and metadata. This feeds into the ACP `session/load` or `_posthog/session/resume` methods on the Claude adapter, which initializes a new Claude SDK query with the rebuilt context. - -Snapshot application can fail without aborting the resume — if the archive URL is missing or the download fails, the saga logs a warning and continues with just the conversation history. The `snapshotApplied` flag in the result tells the caller whether files were actually restored. +The result is a `ResumeState` containing the conversation history as `ConversationTurn[]`, the latest git checkpoint, and metadata. This feeds into the ACP `session/load` or `_posthog/session/resume` methods on the Claude adapter, which initializes a new Claude SDK query with the rebuilt context. ## ACP extensions @@ -250,7 +247,7 @@ ACP defines standard methods like `session/prompt`, `session/update`, and `sessi **State synchronization** — events that keep the client's view of the agent's state in sync. These are essential for the cloud↔local handoff flow and for the client to render accurate UI. - `_posthog/branch_created` — `{ branch }` — agent created a git branch (client can update branch display) -- `_posthog/tree_snapshot` — `{ treeHash, baseCommit, changes, ... }` — git working tree captured as a snapshot. Contains the tree hash, base commit, file change list, and optionally an archive URL. This is the key event for session resume — the resume saga scans backwards for the latest snapshot to restore files +- `_posthog/git_checkpoint` — `{ checkpointId, checkpointRef, branch, head, indexTree, worktreeTree, ... }` — git checkpoint captured for resume and handoff. This is the key event for session resume — the resume saga scans backwards for the latest checkpoint to restore files - `_posthog/mode_change` — `{ mode, previous_mode }` — permission mode changed (client updates mode selector) - `_posthog/compact_boundary` — `{ sessionId, timestamp }` — marks where context compaction occurred, so the client knows the conversation was summarized at this point - `_posthog/task_notification` — `{ sessionId, type, message?, data? }` — generic extensible notification for adapter-specific events @@ -260,7 +257,7 @@ ACP defines standard methods like `session/prompt`, `session/update`, and `sessi - `_posthog/user_message` — `{ content }` — user typed a message (translated to `session/prompt`) - `_posthog/cancel` — cancel the current operation (translated to `session/cancel`) - `_posthog/close` — close the session and clean up -- `_posthog/session/resume` — `{ sessionId, fromSnapshot? }` — request to resume a previous session (triggers the resume flow on the Claude adapter) +- `_posthog/session/resume` — `{ sessionId }` — request to resume a previous session (triggers the resume flow on the Claude adapter) **Debug** — operational visibility without polluting the ACP conversation. diff --git a/packages/agent/package.json b/packages/agent/package.json index 55f3c9897..260f4c3c4 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -60,10 +60,6 @@ "types": "./dist/handoff-checkpoint.d.ts", "import": "./dist/handoff-checkpoint.js" }, - "./tree-tracker": { - "types": "./dist/tree-tracker.d.ts", - "import": "./dist/tree-tracker.js" - }, "./server": { "types": "./dist/server/agent-server.d.ts", "import": "./dist/server/agent-server.js" diff --git a/packages/agent/src/acp-extensions.ts b/packages/agent/src/acp-extensions.ts index 8e50bbe00..1d4f97dac 100644 --- a/packages/agent/src/acp-extensions.ts +++ b/packages/agent/src/acp-extensions.ts @@ -34,9 +34,6 @@ export const POSTHOG_NOTIFICATIONS = { /** Maps taskRunId to agent's sessionId and adapter type (for resumption) */ SDK_SESSION: "_posthog/sdk_session", - /** Tree state snapshot captured (git tree hash + file archive) */ - TREE_SNAPSHOT: "_posthog/tree_snapshot", - /** Git checkpoint captured for handoff */ GIT_CHECKPOINT: "_posthog/git_checkpoint", diff --git a/packages/agent/src/handoff-checkpoint.test.ts b/packages/agent/src/handoff-checkpoint.test.ts index 3f1587665..7c56a5e82 100644 --- a/packages/agent/src/handoff-checkpoint.test.ts +++ b/packages/agent/src/handoff-checkpoint.test.ts @@ -20,8 +20,6 @@ interface HandoffRepos { localGitState: HandoffLocalGitState; } -const WORKTREE_FILES = ["tracked.txt", "unstaged.txt", "untracked.txt"]; - function createMockApi(store: BundleStore) { return { uploadTaskArtifacts: async ( @@ -64,7 +62,7 @@ function createBundleStore(): BundleStore { artifacts: {}, manifest: [ { - storage_path: "gs://bucket/handoff-0-existing-tree_snapshot.tar.gz", + storage_path: "gs://bucket/handoff-0-existing-checkpoint.pack", }, ], }; @@ -128,15 +126,6 @@ async function makeCloudChanges(repo: TestRepo): Promise { await repo.writeFile("untracked.txt", "untracked\n"); } -async function mirrorRestoredWorktree( - cloudRepo: TestRepo, - localRepo: TestRepo, -): Promise { - for (const file of WORKTREE_FILES) { - await localRepo.writeFile(file, await cloudRepo.readFile(file)); - } -} - describe("HandoffCheckpointTracker", () => { const cleanups: Array<() => Promise> = []; @@ -144,7 +133,7 @@ describe("HandoffCheckpointTracker", () => { await Promise.all(cleanups.splice(0).map((cleanup) => cleanup())); }); - it("restores head commit and index state for handoff replay", async () => { + it("restores head, worktree, and index state for handoff replay", async () => { const { cloudRepo, localRepo, branch, localGitState } = await prepareHandoffRepos(cleanups); await makeCloudChanges(cloudRepo); @@ -162,10 +151,6 @@ describe("HandoffCheckpointTracker", () => { const applyTracker = createTracker(localRepo.path, apiClient); await applyTracker.applyFromHandoff(checkpoint); - // The handoff service restores files separately via tree_snapshot. - // Mirror that here so the restored git index can be validated. - await mirrorRestoredWorktree(cloudRepo, localRepo); - expect(await localRepo.git(["rev-parse", "HEAD"])).toBe(checkpoint.head); expect(await localRepo.git(["rev-parse", "--abbrev-ref", "HEAD"])).toBe( branch, diff --git a/packages/agent/src/handoff-checkpoint.ts b/packages/agent/src/handoff-checkpoint.ts index 35c9bcdd1..4d830464c 100644 --- a/packages/agent/src/handoff-checkpoint.ts +++ b/packages/agent/src/handoff-checkpoint.ts @@ -1,12 +1,6 @@ -import { - mkdir, - readdir, - readFile, - rm, - rmdir, - writeFile, -} from "node:fs/promises"; -import { join } from "node:path"; +import { mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { dirname, join } from "node:path"; import { type GitHandoffBranchDivergence, type GitHandoffCheckpoint, @@ -107,8 +101,12 @@ export class HandoffCheckpointTracker { indexArtifactPath: uploads.index?.storagePath, }; } finally { + const tempDir = capture.headPack?.path + ? dirname(capture.headPack.path) + : dirname(capture.indexFile.path); await this.removeIfPresent(capture.headPack?.path); await this.removeIfPresent(capture.indexFile.path); + await rm(tempDir, { recursive: true, force: true }).catch(() => {}); } } @@ -128,8 +126,9 @@ export class HandoffCheckpointTracker { } const gitTracker = this.createGitTracker(); - const tmpDir = join(this.repositoryPath, ".posthog", "tmp"); - await mkdir(tmpDir, { recursive: true }); + const tmpDir = await mkdtemp( + join(tmpdir(), `posthog-code-handoff-${checkpoint.checkpointId}-`), + ); const packPath = join(tmpDir, `${checkpoint.checkpointId}.pack`); const indexPath = join(tmpDir, `${checkpoint.checkpointId}.index`); @@ -168,7 +167,7 @@ export class HandoffCheckpointTracker { } finally { await this.removeIfPresent(packPath); await this.removeIfPresent(indexPath); - await this.removeTmpDirIfEmpty(tmpDir); + await rm(tmpDir, { recursive: true, force: true }).catch(() => {}); } } @@ -299,52 +298,24 @@ export class HandoffCheckpointTracker { uploads: Uploads, ): void { this.logger.info("Captured handoff checkpoint", { - checkpointId: checkpoint.checkpointId, branch: checkpoint.branch, - head: checkpoint.head, - artifactPath: uploads.pack?.storagePath, - indexArtifactPath: uploads.index?.storagePath, - ...this.buildMetricPayload(uploads), + head: checkpoint.head?.slice(0, 7), + totalBytes: this.sumRawBytes(uploads.pack, uploads.index), }); } private logApplyMetrics( checkpoint: GitCheckpoint, - downloads: Downloads, + _downloads: Downloads, totalBytes: number, ): void { this.logger.info("Applied handoff checkpoint", { - checkpointId: checkpoint.checkpointId, - commit: checkpoint.commit, branch: checkpoint.branch, - head: checkpoint.head, - packBytes: downloads.pack?.rawBytes ?? 0, - packWireBytes: downloads.pack?.wireBytes ?? 0, - indexBytes: downloads.index?.rawBytes ?? 0, - indexWireBytes: downloads.index?.wireBytes ?? 0, + head: checkpoint.head?.slice(0, 7), totalBytes, - totalWireBytes: this.sumWireBytes(downloads.pack, downloads.index), }); } - private buildMetricPayload(metrics: ArtifactSlotMap): { - packBytes: number; - packWireBytes: number; - indexBytes: number; - indexWireBytes: number; - totalBytes: number; - totalWireBytes: number; - } { - return { - packBytes: metrics.pack?.rawBytes ?? 0, - packWireBytes: metrics.pack?.wireBytes ?? 0, - indexBytes: metrics.index?.rawBytes ?? 0, - indexWireBytes: metrics.index?.wireBytes ?? 0, - totalBytes: this.sumRawBytes(metrics.pack, metrics.index), - totalWireBytes: this.sumWireBytes(metrics.pack, metrics.index), - }; - } - private sumRawBytes( ...artifacts: Array<{ rawBytes: number } | undefined> ): number { @@ -354,27 +325,10 @@ export class HandoffCheckpointTracker { ); } - private sumWireBytes( - ...artifacts: Array<{ wireBytes: number } | undefined> - ): number { - return artifacts.reduce( - (total, artifact) => total + (artifact?.wireBytes ?? 0), - 0, - ); - } - private async removeIfPresent(filePath: string | undefined): Promise { if (!filePath) { return; } await rm(filePath, { force: true }).catch(() => {}); } - - private async removeTmpDirIfEmpty(tmpDir: string): Promise { - const entries = await readdir(tmpDir).catch(() => null); - if (!entries || entries.length > 0) { - return; - } - await rmdir(tmpDir).catch(() => {}); - } } diff --git a/packages/agent/src/resume.ts b/packages/agent/src/resume.ts index dd7bd1bc8..2f4318182 100644 --- a/packages/agent/src/resume.ts +++ b/packages/agent/src/resume.ts @@ -3,15 +3,15 @@ * * Handles resuming a task from any point: * - Fetches log via the PostHog API - * - Finds latest tree_snapshot event + * - Finds latest git_checkpoint event * - Rebuilds conversation from log events - * - Restores working tree from snapshot + * - Restores working tree from checkpoint * * Uses Saga pattern for atomic operations with clear success/failure tracking. * * The log is the single source of truth for: * - Conversation history (user_message, agent_message_chunk, tool_call, tool_result) - * - Working tree state (tree_snapshot events) + * - Working tree state (git_checkpoint events) * - Session metadata (device info, mode changes) */ @@ -19,16 +19,11 @@ import type { ContentBlock } from "@agentclientprotocol/sdk"; import { selectRecentTurns } from "./adapters/claude/session/jsonl-hydration"; import type { PostHogAPIClient } from "./posthog-api"; import { ResumeSaga } from "./sagas/resume-saga"; -import type { - DeviceInfo, - GitCheckpointEvent, - TreeSnapshotEvent, -} from "./types"; +import type { DeviceInfo, GitCheckpointEvent } from "./types"; import { Logger } from "./utils/logger"; export interface ResumeState { conversation: ConversationTurn[]; - latestSnapshot: TreeSnapshotEvent | null; latestGitCheckpoint: GitCheckpointEvent | null; interrupted: boolean; lastDevice?: DeviceInfo; @@ -59,7 +54,7 @@ export interface ResumeConfig { /** * Resume a task from its persisted log. * Returns the rebuilt state for the agent to continue from. - * Snapshot and checkpoint application happens in the agent server after SSE connects. + * Checkpoint application happens in the agent server after SSE connects. */ export async function resumeFromLog( config: ResumeConfig, @@ -94,7 +89,6 @@ export async function resumeFromLog( return { conversation: result.data.conversation as ConversationTurn[], - latestSnapshot: result.data.latestSnapshot, latestGitCheckpoint: result.data.latestGitCheckpoint, interrupted: result.data.interrupted, lastDevice: result.data.lastDevice, diff --git a/packages/agent/src/sagas/apply-snapshot-saga.test.ts b/packages/agent/src/sagas/apply-snapshot-saga.test.ts deleted file mode 100644 index b9ef1e50f..000000000 --- a/packages/agent/src/sagas/apply-snapshot-saga.test.ts +++ /dev/null @@ -1,691 +0,0 @@ -import { join } from "node:path"; -import type { SagaLogger } from "@posthog/shared"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { ApplySnapshotSaga } from "./apply-snapshot-saga"; -import { - createArchiveBuffer, - createMockApiClient, - createMockLogger, - createSnapshot, - createTestRepo, - type TestRepo, -} from "./test-fixtures"; - -describe("ApplySnapshotSaga", () => { - let repo: TestRepo; - let mockLogger: SagaLogger; - - beforeEach(async () => { - repo = await createTestRepo("apply-snapshot"); - mockLogger = createMockLogger(); - }); - - afterEach(async () => { - await repo.cleanup(); - }); - - describe("file restoration", () => { - it("extracts files from archive", async () => { - const archive = await createArchiveBuffer([ - { path: "new-file.ts", content: "console.log('restored')" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "new-file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - expect(repo.exists("new-file.ts")).toBe(true); - expect(await repo.readFile("new-file.ts")).toBe( - "console.log('restored')", - ); - }); - - it("extracts files in nested directories", async () => { - const archive = await createArchiveBuffer([ - { - path: "src/components/Button.tsx", - content: "export const Button = () => {}", - }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "src/components/Button.tsx", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - expect(repo.exists("src/components/Button.tsx")).toBe(true); - }); - - it("overwrites existing files with archive content", async () => { - await repo.writeFile("existing.ts", "old content"); - - const archive = await createArchiveBuffer([ - { path: "existing.ts", content: "new content from archive" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "existing.ts", status: "M" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(await repo.readFile("existing.ts")).toBe( - "new content from archive", - ); - }); - - it("deletes files marked as deleted", async () => { - await repo.writeFile("to-delete.ts", "delete me"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file"]); - - const archive = await createArchiveBuffer([ - { path: "placeholder.txt", content: "placeholder" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "to-delete.ts", status: "D" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(repo.exists("to-delete.ts")).toBe(false); - }); - - it("handles mixed add/modify/delete changes", async () => { - await repo.writeFile("to-modify.ts", "original"); - await repo.writeFile("to-delete.ts", "delete me"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Setup"]); - - const archive = await createArchiveBuffer([ - { path: "new-file.ts", content: "added" }, - { path: "to-modify.ts", content: "modified" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - changes: [ - { path: "new-file.ts", status: "A" }, - { path: "to-modify.ts", status: "M" }, - { path: "to-delete.ts", status: "D" }, - ], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(repo.exists("new-file.ts")).toBe(true); - expect(await repo.readFile("new-file.ts")).toBe("added"); - expect(await repo.readFile("to-modify.ts")).toBe("modified"); - expect(repo.exists("to-delete.ts")).toBe(false); - }); - }); - - describe("base commit checkout", () => { - it("checks out base commit when different from current HEAD", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("new.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Second commit"]); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - const currentHead = await repo.git(["rev-parse", "HEAD"]); - expect(currentHead).toBe(initialCommit); - }); - - it("skips checkout when base commit matches current HEAD", async () => { - const currentHead = await repo.git(["rev-parse", "HEAD"]); - - const archive = await createArchiveBuffer([ - { path: "file.ts", content: "content" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - baseCommit: currentHead, - changes: [{ path: "file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - const newHead = await repo.git(["rev-parse", "HEAD"]); - expect(newHead).toBe(currentHead); - }); - - it("skips checkout when base commit is null", async () => { - const currentHead = await repo.git(["rev-parse", "HEAD"]); - - const archive = await createArchiveBuffer([ - { path: "file.ts", content: "content" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - baseCommit: null, - changes: [{ path: "file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - const newHead = await repo.git(["rev-parse", "HEAD"]); - expect(newHead).toBe(currentHead); - }); - }); - - describe("failure handling", () => { - it("fails when snapshot has no archive URL", async () => { - const mockApiClient = createMockApiClient(); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ archiveUrl: undefined }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - if (!result.success) { - expect(result.error).toContain("no archive URL"); - } - }); - - it("fails when download returns null", async () => { - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(null), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot(), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - if (!result.success) { - expect(result.failedStep).toBe("download_archive"); - } - }); - - it("fails when download throws", async () => { - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockRejectedValue(new Error("Network error")), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot(), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - if (!result.success) { - expect(result.error).toContain("Network error"); - } - }); - - it("cleans up downloaded archive on success", async () => { - const archive = await createArchiveBuffer([ - { path: "file.ts", content: "content" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(repo.exists(".posthog/tmp/test-tree-hash.tar.gz")).toBe(false); - expect(repo.exists(".posthog/tmp")).toBe(false); - }); - - it("cleans up downloaded archive on checkout failure (rollback verification)", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("conflicting.ts", "original content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file"]); - - await repo.writeFile("conflicting.ts", "uncommitted changes"); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - treeHash: "checkout-fail-hash", - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - - expect(repo.exists(".posthog/tmp/checkout-fail-hash.tar.gz")).toBe(false); - }); - - it("cleans up downloaded archive on extract failure (rollback verification)", async () => { - const invalidArchive = Buffer.from("not a valid tar.gz"); - - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(invalidArchive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - treeHash: "extract-fail-hash", - changes: [{ path: "file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - - expect(repo.exists(".posthog/tmp/extract-fail-hash.tar.gz")).toBe(false); - }); - }); - - describe("dirty working directory", () => { - it("fails early when repo has uncommitted changes before checkout", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("file.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file"]); - - const secondCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("file.ts", "modified but not committed"); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - if (!result.success) { - expect(result.error).toContain("uncommitted change"); - } - - const currentHead = await repo.git(["rev-parse", "HEAD"]); - expect(currentHead).toBe(secondCommit); - }); - - it("skips working tree check when base commit matches current HEAD", async () => { - const currentHead = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("uncommitted.ts", "uncommitted content"); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - baseCommit: currentHead, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - expect(repo.exists("restored.ts")).toBe(true); - }); - - it("leaves user in detached HEAD after applying snapshot with different base", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("other.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "New commit"]); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - const branchOutput = await repo.git(["branch", "--show-current"]); - expect(branchOutput).toBe(""); - - const headRef = await repo - .git(["symbolic-ref", "HEAD"]) - .catch(() => "detached"); - expect(headRef).toBe("detached"); - }); - - it("logs warning about detached HEAD state", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - - await repo.writeFile("other.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "New commit"]); - - const archive = await createArchiveBuffer([ - { path: "restored.ts", content: "restored" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(mockLogger.warn).toHaveBeenCalledWith( - "Applied tree from different commit - now in detached HEAD state", - expect.objectContaining({ - originalBranch: expect.any(String), - baseCommit: initialCommit, - }), - ); - }); - - it("rolls back to original branch on failure after checkout", async () => { - const initialCommit = await repo.git(["rev-parse", "HEAD"]); - const originalBranch = await repo.git(["branch", "--show-current"]); - - await repo.writeFile("other.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "New commit"]); - - const invalidArchive = Buffer.from("not a valid tar.gz"); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(invalidArchive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - baseCommit: initialCommit, - changes: [{ path: "restored.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(false); - - const currentBranch = await repo.git(["branch", "--show-current"]); - expect(currentBranch).toBe(originalBranch); - }); - }); - - describe("edge cases", () => { - it("handles empty snapshot (no changes)", async () => { - const archive = await createArchiveBuffer([ - { path: "placeholder.txt", content: "placeholder" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ changes: [] }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - }); - - it("handles files with spaces in names", async () => { - const archive = await createArchiveBuffer([ - { path: "file with spaces.ts", content: "content" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "file with spaces.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(repo.exists("file with spaces.ts")).toBe(true); - }); - - it("deleting non-existent file does not fail", async () => { - const archive = await createArchiveBuffer([ - { path: "placeholder.txt", content: "placeholder" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - changes: [{ path: "does-not-exist.ts", status: "D" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - }); - - it("returns tree hash on success", async () => { - const archive = await createArchiveBuffer([ - { path: "file.ts", content: "content" }, - ]); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - treeHash: "my-tree-hash", - changes: [{ path: "file.ts", status: "A" }], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.treeHash).toBe("my-tree-hash"); - } - }); - - it("preserves symlinks in archive extraction", async () => { - const { lstat, readlink } = await import("node:fs/promises"); - - const archive = await createArchiveBuffer( - [{ path: "target.txt", content: "symlink target content" }], - [{ path: "link.txt", target: "target.txt" }], - ); - const mockApiClient = createMockApiClient({ - downloadArtifact: vi.fn().mockResolvedValue(archive), - }); - - const saga = new ApplySnapshotSaga(mockLogger); - const result = await saga.run({ - snapshot: createSnapshot({ - changes: [ - { path: "target.txt", status: "A" }, - { path: "link.txt", status: "A" }, - ], - }), - repositoryPath: repo.path, - apiClient: mockApiClient, - taskId: "task-1", - runId: "run-1", - }); - - expect(result.success).toBe(true); - expect(repo.exists("target.txt")).toBe(true); - expect(repo.exists("link.txt")).toBe(true); - - const linkPath = join(repo.path, "link.txt"); - const stats = await lstat(linkPath); - expect(stats.isSymbolicLink()).toBe(true); - - const linkTarget = await readlink(linkPath); - expect(linkTarget).toBe("target.txt"); - }); - }); -}); diff --git a/packages/agent/src/sagas/apply-snapshot-saga.ts b/packages/agent/src/sagas/apply-snapshot-saga.ts deleted file mode 100644 index ab0e554de..000000000 --- a/packages/agent/src/sagas/apply-snapshot-saga.ts +++ /dev/null @@ -1,114 +0,0 @@ -import { mkdir, readdir, rm, rmdir, writeFile } from "node:fs/promises"; -import { join } from "node:path"; -import { ApplyTreeSaga as GitApplyTreeSaga } from "@posthog/git/sagas/tree"; -import { Saga } from "@posthog/shared"; -import type { PostHogAPIClient } from "../posthog-api"; -import type { TreeSnapshot } from "../types"; - -export interface ApplySnapshotInput { - snapshot: TreeSnapshot; - repositoryPath: string; - apiClient: PostHogAPIClient; - taskId: string; - runId: string; -} - -export interface ApplySnapshotOutput { - treeHash: string; -} - -export class ApplySnapshotSaga extends Saga< - ApplySnapshotInput, - ApplySnapshotOutput -> { - readonly sagaName = "ApplySnapshotSaga"; - - private archivePath: string | null = null; - - protected async execute( - input: ApplySnapshotInput, - ): Promise { - const { snapshot, repositoryPath, apiClient, taskId, runId } = input; - const tmpDir = join(repositoryPath, ".posthog", "tmp"); - - if (!snapshot.archiveUrl) { - throw new Error("Cannot apply snapshot: no archive URL"); - } - - const archiveUrl = snapshot.archiveUrl; - - try { - await this.step({ - name: "create_tmp_dir", - execute: () => mkdir(tmpDir, { recursive: true }), - rollback: async () => {}, - }); - - const archivePath = join(tmpDir, `${snapshot.treeHash}.tar.gz`); - this.archivePath = archivePath; - await this.step({ - name: "download_archive", - execute: async () => { - const arrayBuffer = await apiClient.downloadArtifact( - taskId, - runId, - archiveUrl, - ); - if (!arrayBuffer) { - throw new Error("Failed to download archive"); - } - const base64Content = Buffer.from(arrayBuffer).toString("utf-8"); - const binaryContent = Buffer.from(base64Content, "base64"); - await writeFile(archivePath, binaryContent); - this.log.info("Tree archive downloaded", { - treeHash: snapshot.treeHash, - snapshotBytes: binaryContent.byteLength, - snapshotWireBytes: arrayBuffer.byteLength, - totalBytes: binaryContent.byteLength, - totalWireBytes: arrayBuffer.byteLength, - }); - }, - rollback: async () => { - if (this.archivePath) { - await rm(this.archivePath, { force: true }).catch(() => {}); - } - }, - }); - - const gitApplySaga = new GitApplyTreeSaga(this.log); - const applyResult = await gitApplySaga.run({ - baseDir: repositoryPath, - treeHash: snapshot.treeHash, - baseCommit: snapshot.baseCommit, - changes: snapshot.changes, - archivePath: this.archivePath, - }); - - if (!applyResult.success) { - throw new Error(`Failed to apply tree: ${applyResult.error}`); - } - - this.log.info("Tree snapshot applied", { - treeHash: snapshot.treeHash, - totalChanges: snapshot.changes.length, - deletedFiles: snapshot.changes.filter((c) => c.status === "D").length, - }); - - return { treeHash: snapshot.treeHash }; - } finally { - if (this.archivePath) { - await rm(this.archivePath, { force: true }).catch(() => {}); - } - await this.removeTmpDirIfEmpty(tmpDir); - this.archivePath = null; - } - } - - private async removeTmpDirIfEmpty(tmpDir: string): Promise { - const entries = await readdir(tmpDir).catch(() => null); - if (!entries || entries.length > 0) { - return; - } - await rmdir(tmpDir).catch(() => {}); - } -} diff --git a/packages/agent/src/sagas/capture-tree-saga.test.ts b/packages/agent/src/sagas/capture-tree-saga.test.ts deleted file mode 100644 index 39d60e46e..000000000 --- a/packages/agent/src/sagas/capture-tree-saga.test.ts +++ /dev/null @@ -1,910 +0,0 @@ -import { join } from "node:path"; -import type { SagaLogger } from "@posthog/shared"; -import { afterEach, beforeEach, describe, expect, it, type vi } from "vitest"; -import { isCommitOnRemote, validateForCloudHandoff } from "../tree-tracker"; -import { CaptureTreeSaga } from "./capture-tree-saga"; -import { - createMockApiClient, - createMockLogger, - createSnapshot, - createTestRepo, - type TestRepo, -} from "./test-fixtures"; - -describe("CaptureTreeSaga", () => { - let repo: TestRepo; - let mockLogger: SagaLogger; - - beforeEach(async () => { - repo = await createTestRepo("capture-tree"); - mockLogger = createMockLogger(); - }); - - afterEach(async () => { - await repo.cleanup(); - }); - - describe("no changes", () => { - it("returns null snapshot when tree hash matches last capture", async () => { - const saga = new CaptureTreeSaga(mockLogger); - - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (secondResult.success) { - expect(secondResult.data.snapshot).toBeNull(); - expect(secondResult.data.newTreeHash).toBe( - firstResult.data.newTreeHash, - ); - } - }); - }); - - describe("capturing changes", () => { - it("captures added files", async () => { - await repo.writeFile("new-file.ts", "console.log('hello')"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (!result.success) return; - - expect(result.data.snapshot).not.toBeNull(); - expect(result.data.snapshot?.changes).toContainEqual({ - path: "new-file.ts", - status: "A", - }); - }); - - it("captures modified files", async () => { - const saga = new CaptureTreeSaga(mockLogger); - - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.writeFile("README.md", "# Modified"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - expect(secondResult.data.snapshot?.changes).toContainEqual({ - path: "README.md", - status: "M", - }); - }); - - it("captures deleted files", async () => { - await repo.writeFile("to-delete.ts", "delete me"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file to delete"]); - - const saga = new CaptureTreeSaga(mockLogger); - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.deleteFile("to-delete.ts"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - expect(secondResult.data.snapshot?.changes).toContainEqual({ - path: "to-delete.ts", - status: "D", - }); - }); - - it("captures mixed changes", async () => { - await repo.writeFile("existing.ts", "original"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add existing"]); - - const saga = new CaptureTreeSaga(mockLogger); - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.writeFile("new.ts", "new file"); - await repo.writeFile("existing.ts", "modified"); - await repo.deleteFile("README.md"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - const changes = secondResult.data.snapshot?.changes ?? []; - expect(changes).toContainEqual({ path: "new.ts", status: "A" }); - expect(changes).toContainEqual({ path: "existing.ts", status: "M" }); - expect(changes).toContainEqual({ path: "README.md", status: "D" }); - }); - - it("sets interrupted flag when provided", async () => { - await repo.writeFile("file.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - interrupted: true, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.interrupted).toBe(true); - } - }); - - it("includes base commit in snapshot", async () => { - const headCommit = await repo.git(["rev-parse", "HEAD"]); - await repo.writeFile("file.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.baseCommit).toBe(headCommit); - } - }); - }); - - describe("exclusions", () => { - it("excludes .posthog directory from changes", async () => { - await repo.writeFile(".posthog/config.json", "{}"); - await repo.writeFile("regular.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (!result.success) return; - - const changes = result.data.snapshot?.changes ?? []; - expect(changes.find((c) => c.path.includes(".posthog"))).toBeUndefined(); - expect(changes.find((c) => c.path === "regular.ts")).toBeDefined(); - }); - }); - - describe("archive upload", () => { - it("uploads archive when API client provided", async () => { - const mockApiClient = createMockApiClient(); - await repo.writeFile("new.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.archiveUrl).toBe( - "gs://bucket/trees/test.tar.gz", - ); - } - expect(mockApiClient.uploadTaskArtifacts).toHaveBeenCalled(); - }); - - it("skips upload when only deletions", async () => { - await repo.writeFile("to-delete.ts", "delete me"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file"]); - - const saga = new CaptureTreeSaga(mockLogger); - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.deleteFile("to-delete.ts"); - - const mockApiClient = createMockApiClient(); - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - apiClient: mockApiClient, - }); - - expect(secondResult.success).toBe(true); - expect(mockApiClient.uploadTaskArtifacts).not.toHaveBeenCalled(); - }); - - it("handles upload failure", async () => { - const mockApiClient = createMockApiClient(); - ( - mockApiClient.uploadTaskArtifacts as ReturnType - ).mockRejectedValue(new Error("Network error")); - - await repo.writeFile("new.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(false); - if (!result.success) { - expect(result.failedStep).toBe("upload_archive"); - } - }); - - it("cleans up temp index and archive on upload failure (rollback verification)", async () => { - const { readdir } = await import("node:fs/promises"); - - const mockApiClient = createMockApiClient(); - ( - mockApiClient.uploadTaskArtifacts as ReturnType - ).mockRejectedValue(new Error("Network error")); - - await repo.writeFile("new.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(false); - - const tmpDir = join(repo.path, ".posthog", "tmp"); - const files = await readdir(tmpDir).catch(() => []); - - const indexFiles = files.filter((f: string) => f.startsWith("index-")); - expect(indexFiles).toHaveLength(0); - - const archiveFiles = files.filter((f: string) => f.endsWith(".tar.gz")); - expect(archiveFiles).toHaveLength(0); - }); - - it("cleans up temp index on success", async () => { - const { readdir } = await import("node:fs/promises"); - - await repo.writeFile("new.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - - const tmpDir = join(repo.path, ".posthog", "tmp"); - const files = await readdir(tmpDir).catch(() => []); - const indexFiles = files.filter((f: string) => f.startsWith("index-")); - expect(indexFiles).toHaveLength(0); - }); - - it("cleans up uploaded tree archive and tmp dir on success", async () => { - const mockApiClient = createMockApiClient(); - - await repo.writeFile("new.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(true); - expect(repo.exists(".posthog/tmp")).toBe(false); - }); - }); - - describe("git state isolation", () => { - it("does not modify user's staged files", async () => { - await repo.writeFile("staged.ts", "staged content"); - await repo.git(["add", "staged.ts"]); - - await repo.writeFile("unstaged.ts", "unstaged content"); - - const saga = new CaptureTreeSaga(mockLogger); - await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - const status = await repo.git(["status", "--porcelain"]); - expect(status).toContain("A staged.ts"); - expect(status).toContain("?? unstaged.ts"); - }); - - it("does not affect working directory", async () => { - await repo.writeFile("file.ts", "original content"); - - const saga = new CaptureTreeSaga(mockLogger); - await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - const content = await repo.readFile("file.ts"); - expect(content).toBe("original content"); - }); - }); - - describe("concurrent captures", () => { - it("handles concurrent captures without interference", async () => { - await repo.writeFile("file1.ts", "content1"); - - const saga1 = new CaptureTreeSaga(mockLogger); - const saga2 = new CaptureTreeSaga(mockLogger); - - const [result1, result2] = await Promise.all([ - saga1.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }), - saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: null, - }), - ]); - - expect(result1.success).toBe(true); - expect(result2.success).toBe(true); - - if (result1.success && result2.success) { - expect(result1.data.snapshot?.changes).toContainEqual({ - path: "file1.ts", - status: "A", - }); - expect(result2.data.snapshot?.changes).toContainEqual({ - path: "file1.ts", - status: "A", - }); - } - }); - }); - - describe("renamed files", () => { - it("captures renamed files as delete + add (without -M flag)", async () => { - await repo.writeFile("old-name.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add original file"]); - - const saga = new CaptureTreeSaga(mockLogger); - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.git(["mv", "old-name.ts", "new-name.ts"]); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - const changes = secondResult.data.snapshot?.changes ?? []; - expect(changes).toContainEqual({ path: "old-name.ts", status: "D" }); - expect(changes).toContainEqual({ path: "new-name.ts", status: "A" }); - }); - - it("captures renamed files with modifications", async () => { - await repo.writeFile("original.ts", "original content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add original file"]); - - const saga = new CaptureTreeSaga(mockLogger); - const firstResult = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - await repo.git(["mv", "original.ts", "renamed.ts"]); - await repo.writeFile("renamed.ts", "modified content"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - const changes = secondResult.data.snapshot?.changes ?? []; - expect(changes).toContainEqual({ path: "original.ts", status: "D" }); - expect( - changes.some( - (c) => - c.path === "renamed.ts" && (c.status === "A" || c.status === "M"), - ), - ).toBe(true); - }); - }); - - describe("edge cases", () => { - it("handles files with spaces in names", async () => { - await repo.writeFile("file with spaces.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.changes).toContainEqual({ - path: "file with spaces.ts", - status: "A", - }); - } - }); - - it("handles nested directories", async () => { - await repo.writeFile( - "src/components/Button.tsx", - "export const Button = () => {}", - ); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.changes).toContainEqual({ - path: "src/components/Button.tsx", - status: "A", - }); - } - }); - - it("handles binary files", async () => { - const binaryContent = Buffer.from([0x00, 0xff, 0x00, 0xff]); - const { writeFile: fsWriteFile } = await import("node:fs/promises"); - await fsWriteFile(join(repo.path, "binary.bin"), binaryContent); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.changes).toContainEqual({ - path: "binary.bin", - status: "A", - }); - } - }); - - it("handles symlinks", async () => { - const { symlink } = await import("node:fs/promises"); - - await repo.writeFile("target.txt", "symlink target content"); - await symlink("target.txt", join(repo.path, "link.txt")); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - if (result.success) { - expect(result.data.snapshot?.changes).toContainEqual({ - path: "link.txt", - status: "A", - }); - } - }); - }); - - describe("delta calculation", () => { - it("always calculates delta against HEAD, not lastTreeHash", async () => { - await repo.writeFile("file1.ts", "content1"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add file1"]); - - await repo.writeFile("file2.ts", "content2"); - - const saga1 = new CaptureTreeSaga(mockLogger); - const firstResult = await saga1.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - expect(firstResult.data.snapshot?.changes).toContainEqual({ - path: "file2.ts", - status: "A", - }); - - await repo.writeFile("file3.ts", "content3"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - const changes = secondResult.data.snapshot?.changes ?? []; - expect(changes).toContainEqual({ path: "file2.ts", status: "A" }); - expect(changes).toContainEqual({ path: "file3.ts", status: "A" }); - }); - - it("second capture shows full delta from HEAD (not incremental)", async () => { - await repo.writeFile("existing.ts", "original"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Add existing"]); - - await repo.writeFile("existing.ts", "modified"); - - const saga1 = new CaptureTreeSaga(mockLogger); - const firstResult = await saga1.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - expect(firstResult.data.snapshot?.changes).toContainEqual({ - path: "existing.ts", - status: "M", - }); - - // Make another change to trigger a new capture (otherwise skip-unchanged kicks in) - await repo.writeFile("existing.ts", "modified again"); - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - - // Even though only the content of existing.ts changed since last capture, - // the delta should still show M (modified from HEAD), not just incremental changes - expect(secondResult.data.snapshot?.changes).toContainEqual({ - path: "existing.ts", - status: "M", - }); - }); - - it("uses lastTreeHash only for skip-unchanged optimization", async () => { - await repo.writeFile("file.ts", "content"); - - const saga1 = new CaptureTreeSaga(mockLogger); - const firstResult = await saga1.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - expect(firstResult.success).toBe(true); - if (!firstResult.success) return; - - const saga2 = new CaptureTreeSaga(mockLogger); - const secondResult = await saga2.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-2", - lastTreeHash: firstResult.data.newTreeHash, - }); - - expect(secondResult.success).toBe(true); - if (!secondResult.success) return; - expect(secondResult.data.snapshot).toBeNull(); - expect(secondResult.data.newTreeHash).toBe(firstResult.data.newTreeHash); - }); - }); - - describe("submodule detection", () => { - it("warns when repository has .gitmodules file", async () => { - await repo.writeFile( - ".gitmodules", - '[submodule "vendor/lib"]\n\tpath = vendor/lib\n\turl = https://example.com/lib.git', - ); - await repo.writeFile("file.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - expect(mockLogger.warn).toHaveBeenCalledWith( - "Repository has submodules - snapshot may not capture submodule state", - ); - }); - - it("does not warn when repository has no submodules", async () => { - await repo.writeFile("file.ts", "content"); - - const saga = new CaptureTreeSaga(mockLogger); - const result = await saga.run({ - repositoryPath: repo.path, - taskId: "task-1", - runId: "run-1", - lastTreeHash: null, - }); - - expect(result.success).toBe(true); - expect(mockLogger.warn).not.toHaveBeenCalledWith( - expect.stringContaining("submodules"), - ); - }); - }); -}); - -describe("validateForCloudHandoff", () => { - let repo: TestRepo; - - beforeEach(async () => { - repo = await createTestRepo("cloud-handoff"); - }); - - afterEach(async () => { - await repo.cleanup(); - }); - - it("throws error when snapshot has no base commit", async () => { - const snapshot = createSnapshot({ baseCommit: null }); - - await expect(validateForCloudHandoff(snapshot, repo.path)).rejects.toThrow( - "Cannot hand off to cloud: no base commit", - ); - }); - - it("throws error when base commit is not on any remote", async () => { - const headCommit = await repo.git(["rev-parse", "HEAD"]); - const snapshot = createSnapshot({ baseCommit: headCommit }); - - await expect(validateForCloudHandoff(snapshot, repo.path)).rejects.toThrow( - /is not pushed.*Run 'git push'/, - ); - }); - - it("succeeds when base commit is on remote", async () => { - const { execFile } = await import("node:child_process"); - const { promisify } = await import("node:util"); - const { tmpdir } = await import("node:os"); - const { mkdir, rm } = await import("node:fs/promises"); - const { join } = await import("node:path"); - - const execFileAsync = promisify(execFile); - - const remoteDir = join(tmpdir(), `remote-${Date.now()}`); - await mkdir(remoteDir, { recursive: true }); - await execFileAsync("git", ["init", "--bare"], { cwd: remoteDir }); - - const branchName = await repo.git(["rev-parse", "--abbrev-ref", "HEAD"]); - await repo.git(["remote", "add", "origin", remoteDir]); - await repo.git(["push", "-u", "origin", branchName]); - - const headCommit = await repo.git(["rev-parse", "HEAD"]); - const snapshot = createSnapshot({ baseCommit: headCommit }); - - await expect( - validateForCloudHandoff(snapshot, repo.path), - ).resolves.toBeUndefined(); - - await rm(remoteDir, { recursive: true, force: true }); - }); -}); - -describe("isCommitOnRemote", () => { - let repo: TestRepo; - - beforeEach(async () => { - repo = await createTestRepo("commit-remote"); - }); - - afterEach(async () => { - await repo.cleanup(); - }); - - it("returns false when no remote configured", async () => { - const headCommit = await repo.git(["rev-parse", "HEAD"]); - const result = await isCommitOnRemote(headCommit, repo.path); - expect(result).toBe(false); - }); - - it("returns false for invalid commit", async () => { - const result = await isCommitOnRemote("invalid-commit-hash", repo.path); - expect(result).toBe(false); - }); - - it("returns false for local-only commit", async () => { - const { execFile } = await import("node:child_process"); - const { promisify } = await import("node:util"); - const { tmpdir } = await import("node:os"); - const { mkdir, rm } = await import("node:fs/promises"); - const { join } = await import("node:path"); - - const execFileAsync = promisify(execFile); - - const remoteDir = join(tmpdir(), `remote-${Date.now()}`); - await mkdir(remoteDir, { recursive: true }); - await execFileAsync("git", ["init", "--bare"], { cwd: remoteDir }); - - const branchName = await repo.git(["rev-parse", "--abbrev-ref", "HEAD"]); - await repo.git(["remote", "add", "origin", remoteDir]); - await repo.git(["push", "-u", "origin", branchName]); - - await repo.writeFile("new.ts", "content"); - await repo.git(["add", "."]); - await repo.git(["commit", "-m", "Local only"]); - - const localCommit = await repo.git(["rev-parse", "HEAD"]); - const result = await isCommitOnRemote(localCommit, repo.path); - expect(result).toBe(false); - - await rm(remoteDir, { recursive: true, force: true }); - }); - - it("returns true for pushed commit", async () => { - const { execFile } = await import("node:child_process"); - const { promisify } = await import("node:util"); - const { tmpdir } = await import("node:os"); - const { mkdir, rm } = await import("node:fs/promises"); - const { join } = await import("node:path"); - - const execFileAsync = promisify(execFile); - - const remoteDir = join(tmpdir(), `remote-${Date.now()}`); - await mkdir(remoteDir, { recursive: true }); - await execFileAsync("git", ["init", "--bare"], { cwd: remoteDir }); - - const branchName = await repo.git(["rev-parse", "--abbrev-ref", "HEAD"]); - await repo.git(["remote", "add", "origin", remoteDir]); - await repo.git(["push", "-u", "origin", branchName]); - - const headCommit = await repo.git(["rev-parse", "HEAD"]); - const result = await isCommitOnRemote(headCommit, repo.path); - expect(result).toBe(true); - - await rm(remoteDir, { recursive: true, force: true }); - }); -}); diff --git a/packages/agent/src/sagas/capture-tree-saga.ts b/packages/agent/src/sagas/capture-tree-saga.ts deleted file mode 100644 index e0b0980a1..000000000 --- a/packages/agent/src/sagas/capture-tree-saga.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { existsSync } from "node:fs"; -import { readdir, readFile, rm, rmdir } from "node:fs/promises"; -import { join } from "node:path"; -import { CaptureTreeSaga as GitCaptureTreeSaga } from "@posthog/git/sagas/tree"; -import { Saga } from "@posthog/shared"; -import type { PostHogAPIClient } from "../posthog-api"; -import type { TreeSnapshot } from "../types"; - -export interface CaptureTreeInput { - repositoryPath: string; - taskId: string; - runId: string; - apiClient?: PostHogAPIClient; - lastTreeHash: string | null; - interrupted?: boolean; -} - -export interface CaptureTreeOutput { - snapshot: TreeSnapshot | null; - newTreeHash: string | null; -} - -export class CaptureTreeSaga extends Saga { - readonly sagaName = "CaptureTreeSaga"; - - protected async execute(input: CaptureTreeInput): Promise { - const { - repositoryPath, - lastTreeHash, - interrupted, - apiClient, - taskId, - runId, - } = input; - const tmpDir = join(repositoryPath, ".posthog", "tmp"); - - if (existsSync(join(repositoryPath, ".gitmodules"))) { - this.log.warn( - "Repository has submodules - snapshot may not capture submodule state", - ); - } - - const shouldArchive = !!apiClient; - const archivePath = shouldArchive - ? join(tmpDir, `tree-${Date.now()}.tar.gz`) - : undefined; - - try { - const gitCaptureSaga = new GitCaptureTreeSaga(this.log); - const captureResult = await gitCaptureSaga.run({ - baseDir: repositoryPath, - lastTreeHash, - archivePath, - }); - - if (!captureResult.success) { - throw new Error(`Failed to capture tree: ${captureResult.error}`); - } - - const { - snapshot: gitSnapshot, - archivePath: createdArchivePath, - changed, - } = captureResult.data; - - if (!changed || !gitSnapshot) { - this.log.debug("No changes since last capture", { lastTreeHash }); - return { snapshot: null, newTreeHash: lastTreeHash }; - } - - let archiveUrl: string | undefined; - if (apiClient && createdArchivePath) { - try { - archiveUrl = await this.uploadArchive( - createdArchivePath, - gitSnapshot.treeHash, - apiClient, - taskId, - runId, - ); - } finally { - await rm(createdArchivePath, { force: true }).catch(() => {}); - } - } - - const snapshot: TreeSnapshot = { - treeHash: gitSnapshot.treeHash, - baseCommit: gitSnapshot.baseCommit, - changes: gitSnapshot.changes, - timestamp: gitSnapshot.timestamp, - interrupted, - archiveUrl, - }; - - this.log.info("Tree captured", { - treeHash: snapshot.treeHash, - changes: snapshot.changes.length, - interrupted, - archiveUrl, - }); - - return { snapshot, newTreeHash: snapshot.treeHash }; - } finally { - if (archivePath) { - await rm(archivePath, { force: true }).catch(() => {}); - } - await this.removeTmpDirIfEmpty(tmpDir); - } - } - - private async uploadArchive( - archivePath: string, - treeHash: string, - apiClient: PostHogAPIClient, - taskId: string, - runId: string, - ): Promise { - const archiveUrl = await this.step({ - name: "upload_archive", - execute: async () => { - const archiveContent = await readFile(archivePath); - const base64Content = archiveContent.toString("base64"); - const snapshotBytes = archiveContent.byteLength; - const snapshotWireBytes = Buffer.byteLength(base64Content, "utf-8"); - - const artifacts = await apiClient.uploadTaskArtifacts(taskId, runId, [ - { - name: `trees/${treeHash}.tar.gz`, - type: "tree_snapshot", - content: base64Content, - content_type: "application/gzip", - }, - ]); - - const uploadedArtifact = artifacts[0]; - if (uploadedArtifact?.storage_path) { - this.log.info("Tree archive uploaded", { - storagePath: uploadedArtifact.storage_path, - treeHash, - snapshotBytes, - snapshotWireBytes, - totalBytes: snapshotBytes, - totalWireBytes: snapshotWireBytes, - }); - return uploadedArtifact.storage_path; - } - - return undefined; - }, - rollback: async () => { - await rm(archivePath, { force: true }).catch(() => {}); - }, - }); - - return archiveUrl; - } - - private async removeTmpDirIfEmpty(tmpDir: string): Promise { - const entries = await readdir(tmpDir).catch(() => null); - if (!entries || entries.length > 0) { - return; - } - await rmdir(tmpDir).catch(() => {}); - } -} diff --git a/packages/agent/src/sagas/resume-saga.test.ts b/packages/agent/src/sagas/resume-saga.test.ts index d87b754e5..3e8675e54 100644 --- a/packages/agent/src/sagas/resume-saga.test.ts +++ b/packages/agent/src/sagas/resume-saga.test.ts @@ -1,19 +1,17 @@ import type { SagaLogger } from "@posthog/shared"; import { afterEach, beforeEach, describe, expect, it, type vi } from "vitest"; -import { POSTHOG_NOTIFICATIONS } from "../acp-extensions"; import type { PostHogAPIClient } from "../posthog-api"; import { ResumeSaga } from "./resume-saga"; import { createAgentChunk, createAgentMessage, + createGitCheckpointNotification, createMockApiClient, createMockLogger, - createNotification, createTaskRun, createTestRepo, createToolCall, createToolResult, - createTreeSnapshotNotification, createUserMessage, type TestRepo, } from "./test-fixtures"; @@ -50,7 +48,7 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (result.success) { expect(result.data.conversation).toHaveLength(0); - expect(result.data.latestSnapshot).toBeNull(); + expect(result.data.latestGitCheckpoint).toBeNull(); expect(result.data.logEntryCount).toBe(0); } }); @@ -412,76 +410,24 @@ describe("ResumeSaga", () => { }); }); - describe("snapshot finding", () => { - it("finds latest tree snapshot", async () => { + describe("checkpoint finding", () => { + it("finds latest git checkpoint", async () => { (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( createTaskRun(), ); ( mockApiClient.fetchTaskRunLogs as ReturnType ).mockResolvedValue([ - createTreeSnapshotNotification("hash-1"), - createUserMessage("continue"), - createTreeSnapshotNotification("hash-2", "gs://bucket/hash-2.tar.gz"), - ]); - - const saga = new ResumeSaga(mockLogger); - const result = await saga.run({ - taskId: "task-1", - runId: "run-1", - repositoryPath: repo.path, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(true); - if (!result.success) return; - - expect(result.data.latestSnapshot?.treeHash).toBe("hash-2"); - }); - - it("returns interrupted flag from snapshot", async () => { - (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( - createTaskRun(), - ); - ( - mockApiClient.fetchTaskRunLogs as ReturnType - ).mockResolvedValue([ - createTreeSnapshotNotification("hash-1", "gs://bucket/file.tar.gz", { - interrupted: true, + createGitCheckpointNotification({ + checkpointId: "checkpoint-1", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-1", + head: "head-1", }), - ]); - - const saga = new ResumeSaga(mockLogger); - const result = await saga.run({ - taskId: "task-1", - runId: "run-1", - repositoryPath: repo.path, - apiClient: mockApiClient, - }); - - expect(result.success).toBe(true); - if (!result.success) return; - - expect(result.data.interrupted).toBe(true); - }); - }); - - describe("snapshot metadata", () => { - it("returns latest snapshot metadata when archive URL present", async () => { - const baseCommit = await repo.git(["rev-parse", "HEAD"]); - - (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( - createTaskRun(), - ); - ( - mockApiClient.fetchTaskRunLogs as ReturnType - ).mockResolvedValue([ - createNotification(POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, { - treeHash: "hash-1", - baseCommit, - archiveUrl: "gs://bucket/hash-1.tar.gz", - changes: [{ path: "restored.ts", status: "A" }], - timestamp: new Date().toISOString(), + createUserMessage("continue"), + createGitCheckpointNotification({ + checkpointId: "checkpoint-2", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-2", + head: "head-2", }), ]); @@ -496,21 +442,22 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.latestSnapshot?.treeHash).toBe("hash-1"); - expect(result.data.latestSnapshot?.archiveUrl).toBe( - "gs://bucket/hash-1.tar.gz", + expect(result.data.latestGitCheckpoint?.checkpointId).toBe( + "checkpoint-2", ); }); - it("returns snapshot metadata even when no archive URL", async () => { + it("does not mark resume as interrupted from checkpoint state", async () => { (mockApiClient.getTaskRun as ReturnType).mockResolvedValue( createTaskRun(), ); ( mockApiClient.fetchTaskRunLogs as ReturnType ).mockResolvedValue([ - createTreeSnapshotNotification("hash-1"), - createUserMessage("hello"), + createGitCheckpointNotification({ + checkpointId: "checkpoint-1", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-1", + }), ]); const saga = new ResumeSaga(mockLogger); @@ -524,8 +471,7 @@ describe("ResumeSaga", () => { expect(result.success).toBe(true); if (!result.success) return; - expect(result.data.latestSnapshot).not.toBeNull(); - expect(result.data.conversation).toHaveLength(1); + expect(result.data.interrupted).toBe(false); }); }); @@ -537,10 +483,14 @@ describe("ResumeSaga", () => { ( mockApiClient.fetchTaskRunLogs as ReturnType ).mockResolvedValue([ - createTreeSnapshotNotification("hash-1", undefined, { + createGitCheckpointNotification({ + checkpointId: "checkpoint-1", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-1", device: { type: "local" }, }), - createTreeSnapshotNotification("hash-2", undefined, { + createGitCheckpointNotification({ + checkpointId: "checkpoint-2", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-2", device: { type: "cloud" }, }), ]); diff --git a/packages/agent/src/sagas/resume-saga.ts b/packages/agent/src/sagas/resume-saga.ts index 8810675fc..6762effff 100644 --- a/packages/agent/src/sagas/resume-saga.ts +++ b/packages/agent/src/sagas/resume-saga.ts @@ -1,12 +1,11 @@ import type { ContentBlock } from "@agentclientprotocol/sdk"; import { Saga } from "@posthog/shared"; -import { isNotification, POSTHOG_NOTIFICATIONS } from "../acp-extensions"; +import { POSTHOG_NOTIFICATIONS } from "../acp-extensions"; import type { PostHogAPIClient } from "../posthog-api"; import type { DeviceInfo, GitCheckpointEvent, StoredNotification, - TreeSnapshotEvent, } from "../types"; import type { Logger } from "../utils/logger"; @@ -33,7 +32,6 @@ export interface ResumeInput { export interface ResumeOutput { conversation: ConversationTurn[]; - latestSnapshot: TreeSnapshotEvent | null; latestGitCheckpoint: GitCheckpointEvent | null; interrupted: boolean; lastDevice?: DeviceInfo; @@ -68,25 +66,11 @@ export class ResumeSaga extends Saga { this.log.info("Fetched log entries", { count: entries.length }); - // Step 3: Find latest snapshot (read-only, pure computation) - const latestSnapshot = await this.readOnlyStep("find_snapshot", () => - Promise.resolve(this.findLatestTreeSnapshot(entries)), - ); - const latestGitCheckpoint = await this.readOnlyStep( "find_git_checkpoint", () => Promise.resolve(this.findLatestGitCheckpoint(entries)), ); - // Step 4: Apply snapshot if present (wrapped in step for consistent logging) - if (latestSnapshot) { - this.log.info("Found tree snapshot", { - treeHash: latestSnapshot.treeHash, - hasArchiveUrl: !!latestSnapshot.archiveUrl, - changes: latestSnapshot.changes?.length ?? 0, - }); - } - if (latestGitCheckpoint) { this.log.info("Found git checkpoint", { checkpointId: latestGitCheckpoint.checkpointId, @@ -105,16 +89,14 @@ export class ResumeSaga extends Saga { this.log.info("Resume state rebuilt", { turns: conversation.length, - hasSnapshot: !!latestSnapshot, hasGitCheckpoint: !!latestGitCheckpoint, - interrupted: latestSnapshot?.interrupted ?? false, + interrupted: false, }); return { conversation, - latestSnapshot, latestGitCheckpoint, - interrupted: latestSnapshot?.interrupted ?? false, + interrupted: false, lastDevice, logEntryCount: entries.length, }; @@ -123,35 +105,12 @@ export class ResumeSaga extends Saga { private emptyResult(): ResumeOutput { return { conversation: [], - latestSnapshot: null, latestGitCheckpoint: null, interrupted: false, logEntryCount: 0, }; } - private findLatestTreeSnapshot( - entries: StoredNotification[], - ): TreeSnapshotEvent | null { - for (let i = entries.length - 1; i >= 0; i--) { - const entry = entries[i]; - if ( - isNotification( - entry.notification?.method, - POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, - ) - ) { - const params = entry.notification.params as - | TreeSnapshotEvent - | undefined; - if (params?.treeHash) { - return params; - } - } - } - return null; - } - private findLatestGitCheckpoint( entries: StoredNotification[], ): GitCheckpointEvent | null { diff --git a/packages/agent/src/sagas/test-fixtures.ts b/packages/agent/src/sagas/test-fixtures.ts index 1cf56a56d..3baebcdd3 100644 --- a/packages/agent/src/sagas/test-fixtures.ts +++ b/packages/agent/src/sagas/test-fixtures.ts @@ -5,11 +5,10 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { promisify } from "node:util"; import type { SagaLogger } from "@posthog/shared"; -import * as tar from "tar"; import { vi } from "vitest"; import { POSTHOG_NOTIFICATIONS } from "../acp-extensions"; import type { PostHogAPIClient } from "../posthog-api"; -import type { StoredNotification, TaskRun, TreeSnapshot } from "../types"; +import type { GitCheckpointEvent, StoredNotification, TaskRun } from "../types"; const execFileAsync = promisify(execFile); @@ -128,7 +127,7 @@ export function createMockApiClient( return { uploadTaskArtifacts: vi .fn() - .mockResolvedValue([{ storage_path: "gs://bucket/trees/test.tar.gz" }]), + .mockResolvedValue([{ storage_path: "gs://bucket/handoff/test.pack" }]), downloadArtifact: vi.fn(), getTaskRun: vi.fn(), fetchTaskRunLogs: vi.fn(), @@ -136,69 +135,6 @@ export function createMockApiClient( } as unknown as PostHogAPIClient; } -export interface ArchiveFile { - path: string; - content: string; -} - -export interface ArchiveSymlink { - path: string; - target: string; -} - -export async function createArchiveBuffer( - files: Array, - symlinks: Array = [], -): Promise { - const { symlink } = await import("node:fs/promises"); - const tmpDir = join( - tmpdir(), - `archive-${Date.now()}-${Math.random().toString(36).slice(2)}`, - ); - await mkdir(tmpDir, { recursive: true }); - - const filesToArchive = - files.length > 0 ? files : [{ path: ".empty", content: "" }]; - - for (const file of filesToArchive) { - const fullPath = join(tmpDir, file.path); - await mkdir(join(fullPath, ".."), { recursive: true }); - await writeFile(fullPath, file.content); - } - - const symlinkPaths: string[] = []; - for (const link of symlinks) { - const fullPath = join(tmpDir, link.path); - await mkdir(join(fullPath, ".."), { recursive: true }); - await symlink(link.target, fullPath); - symlinkPaths.push(link.path); - } - - const archivePath = join(tmpDir, "archive.tar.gz"); - await tar.create({ gzip: true, file: archivePath, cwd: tmpDir }, [ - ...filesToArchive.map((f) => f.path), - ...symlinkPaths, - ]); - - const content = await readFile(archivePath); - await rm(tmpDir, { recursive: true, force: true }); - - return Buffer.from(content.toString("base64")); -} - -export function createSnapshot( - overrides: Partial = {}, -): TreeSnapshot { - return { - treeHash: "test-tree-hash", - baseCommit: null, - archiveUrl: "gs://bucket/trees/test.tar.gz", - changes: [], - timestamp: new Date().toISOString(), - ...overrides, - }; -} - export function createTaskRun(overrides: Partial = {}): TaskRun { return { id: "run-1", @@ -290,17 +226,22 @@ export function createToolResult( }); } -export function createTreeSnapshotNotification( - treeHash: string, - archiveUrl?: string, - options: { interrupted?: boolean; device?: { type: "local" | "cloud" } } = {}, +export function createGitCheckpointNotification( + overrides: Partial = {}, ): StoredNotification { - return createNotification(POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, { - treeHash, - baseCommit: "abc123", - archiveUrl, - changes: [{ path: "file.ts", status: "A" }], + return createNotification(POSTHOG_NOTIFICATIONS.GIT_CHECKPOINT, { + checkpointId: "checkpoint-1", + commit: "commit-1", + checkpointRef: "refs/posthog-code-checkpoint/checkpoint-1", + headRef: "refs/posthog-code-handoff/head/checkpoint-1", + head: "head-1", + branch: "main", + indexTree: "index-tree-1", + worktreeTree: "worktree-tree-1", timestamp: new Date().toISOString(), - ...options, + upstreamRemote: "origin", + upstreamMergeRef: "refs/heads/main", + remoteUrl: "git@github.com:posthog/posthog.git", + ...overrides, }); } diff --git a/packages/agent/src/server/agent-server.ts b/packages/agent/src/server/agent-server.ts index 018d7d73e..dbd260b39 100644 --- a/packages/agent/src/server/agent-server.ts +++ b/packages/agent/src/server/agent-server.ts @@ -26,7 +26,6 @@ import { resumeFromLog, } from "../resume"; import { SessionLogWriter } from "../session-log-writer"; -import { TreeTracker } from "../tree-tracker"; import type { AgentMode, DeviceInfo, @@ -34,7 +33,6 @@ import type { HandoffLocalGitState, LogLevel, TaskRun, - TreeSnapshotEvent, } from "../types"; import { AsyncMutex } from "../utils/async-mutex"; import { getLlmGatewayUrl } from "../utils/gateway"; @@ -164,7 +162,6 @@ interface ActiveSession { acpSessionId: string; acpConnection: InProcessAcpConnection; clientConnection: ClientSideConnection; - treeTracker: TreeTracker | null; sseController: SseController | null; deviceInfo: DeviceInfo; logWriter: SessionLogWriter; @@ -697,16 +694,6 @@ export class AgentServer { userAgent: `posthog/cloud.hog.dev; version: ${this.config.version ?? packageJson.version}`, }); - const treeTracker = this.config.repositoryPath - ? new TreeTracker({ - repositoryPath: this.config.repositoryPath, - taskId: payload.task_id, - runId: payload.run_id, - apiClient: posthogAPI, - logger: new Logger({ debug: true, prefix: "[TreeTracker]" }), - }) - : null; - const logWriter = new SessionLogWriter({ posthogAPI, logger: new Logger({ debug: true, prefix: "[SessionLogWriter]" }), @@ -831,7 +818,6 @@ export class AgentServer { acpSessionId, acpConnection, clientConnection, - treeTracker, sseController, deviceInfo, logWriter, @@ -977,36 +963,7 @@ export class AgentServer { this.resumeState.conversation, ); - let snapshotApplied = false; - if ( - this.resumeState.latestSnapshot?.archiveUrl && - this.config.repositoryPath && - this.posthogAPI - ) { - try { - const treeTracker = new TreeTracker({ - repositoryPath: this.config.repositoryPath, - taskId: payload.task_id, - runId: payload.run_id, - apiClient: this.posthogAPI, - logger: this.logger.child("TreeTracker"), - }); - await treeTracker.applyTreeSnapshot(this.resumeState.latestSnapshot); - treeTracker.setLastTreeHash(this.resumeState.latestSnapshot.treeHash); - snapshotApplied = true; - this.logger.info("Tree snapshot applied", { - treeHash: this.resumeState.latestSnapshot.treeHash, - changes: this.resumeState.latestSnapshot.changes?.length ?? 0, - hasArchiveUrl: !!this.resumeState.latestSnapshot.archiveUrl, - }); - } catch (error) { - this.logger.warn("Failed to apply tree snapshot", { - error: error instanceof Error ? error.message : String(error), - treeHash: this.resumeState.latestSnapshot.treeHash, - }); - } - } - + let checkpointApplied = false; if ( this.resumeState.latestGitCheckpoint && this.config.repositoryPath && @@ -1023,6 +980,7 @@ export class AgentServer { const metrics = await checkpointTracker.applyFromHandoff( this.resumeState.latestGitCheckpoint, ); + checkpointApplied = true; this.logger.info("Git checkpoint applied", { branch: this.resumeState.latestGitCheckpoint.branch, head: this.resumeState.latestGitCheckpoint.head, @@ -1040,9 +998,9 @@ export class AgentServer { const pendingUserPrompt = this.getPendingUserPrompt(taskRun); - const sandboxContext = snapshotApplied - ? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.` - : `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`; + const sandboxContext = checkpointApplied + ? `The workspace environment (all files, packages, and code changes) has been fully restored from the latest checkpoint.` + : `The workspace from the previous session was not restored from a checkpoint, so you are starting with a fresh environment. Your conversation history is fully preserved below.`; let resumePromptBlocks: ContentBlock[]; if (pendingUserPrompt?.length) { @@ -1079,7 +1037,7 @@ export class AgentServer { conversationTurns: this.resumeState.conversation.length, promptLength: promptBlocksToText(resumePromptBlocks).length, hasPendingUserMessage: !!pendingUserPrompt?.length, - snapshotApplied, + checkpointApplied, hasGitCheckpoint: !!this.resumeState.latestGitCheckpoint, gitCheckpointBranch: this.resumeState.latestGitCheckpoint?.branch ?? null, @@ -1156,7 +1114,6 @@ export class AgentServer { }); this.logger.info("Resume state loaded", { conversationTurns: this.resumeState.conversation.length, - hasSnapshot: !!this.resumeState.latestSnapshot, hasGitCheckpoint: !!this.resumeState.latestGitCheckpoint, gitCheckpointBranch: this.resumeState.latestGitCheckpoint?.branch ?? null, @@ -1602,7 +1559,8 @@ ${attributionInstructions} } // session/update notifications flow through the tapped stream (like local transport) - // Only handle tree state capture for file changes here + // Capture checkpoints for file-changing tools so cloud resumes restore + // from git checkpoints rather than tree snapshots. if (params.update?.sessionUpdate === "tool_call_update") { const meta = (params.update?._meta as Record) ?.claudeCode as Record | undefined; @@ -1612,10 +1570,14 @@ ${attributionInstructions} | undefined; if ( - (toolName === "Write" || toolName === "Edit") && + (toolName === "Write" || + toolName === "Edit" || + toolName === "MultiEdit" || + toolName === "Delete" || + toolName === "Move") && toolResponse?.filePath ) { - await this.captureTreeState(); + await this.captureCheckpointState(); } if ( @@ -1837,15 +1799,9 @@ ${attributionInstructions} this.logger.info("Cleaning up session"); try { - await this.captureHandoffCheckpoint(); - } catch (error) { - this.logger.error("Failed to capture handoff checkpoint", error); - } - - try { - await this.captureTreeState(); + await this.captureCheckpointState(this.session.pendingHandoffGitState); } catch (error) { - this.logger.error("Failed to capture final tree state", error); + this.logger.error("Failed to capture final checkpoint state", error); } try { @@ -1881,50 +1837,15 @@ ${attributionInstructions} this.session = null; } - private async captureTreeState(): Promise { - if (!this.session?.treeTracker) return; - - try { - const snapshot = await this.session.treeTracker.captureTree({}); - if (snapshot) { - const snapshotWithDevice: TreeSnapshotEvent = { - ...snapshot, - device: this.session.deviceInfo, - }; - - const notification = { - jsonrpc: "2.0" as const, - method: POSTHOG_NOTIFICATIONS.TREE_SNAPSHOT, - params: snapshotWithDevice, - }; - - this.broadcastEvent({ - type: "notification", - timestamp: new Date().toISOString(), - notification, - }); - - // Persist full snapshot (including archiveUrl) so resume can restore files. - // archiveUrl is a pre-signed S3 URL that expires — if the user resumes - // after expiry, ApplySnapshotSaga fails gracefully and the agent continues - // with conversation context but a fresh sandbox (snapshotApplied=false). - this.session.logWriter.appendRawLine( - this.session.payload.run_id, - JSON.stringify(notification), - ); - } - } catch (error) { - this.logger.error("Failed to capture tree state", error); - } - } - - private async captureHandoffCheckpoint(): Promise { - if (!this.session?.treeTracker || !this.session.pendingHandoffGitState) { + private async captureCheckpointState( + localGitState?: HandoffLocalGitState, + ): Promise { + if (!this.session || !this.config.repositoryPath) { return; } if (!this.posthogAPI) { this.logger.warn( - "Skipping handoff checkpoint capture: PostHog API client is not configured", + "Skipping checkpoint capture: PostHog API client is not configured", ); return; } @@ -1937,9 +1858,7 @@ ${attributionInstructions} logger: this.logger.child("HandoffCheckpoint"), }); - const checkpoint = await tracker.captureForHandoff( - this.session.pendingHandoffGitState, - ); + const checkpoint = await tracker.captureForHandoff(localGitState); if (!checkpoint) return; const checkpointWithDevice: GitCheckpointEvent = { diff --git a/packages/agent/src/test/fixtures/api.ts b/packages/agent/src/test/fixtures/api.ts index 7ca431f3d..4cc68857f 100644 --- a/packages/agent/src/test/fixtures/api.ts +++ b/packages/agent/src/test/fixtures/api.ts @@ -6,7 +6,7 @@ import { join } from "node:path"; import { promisify } from "node:util"; import { vi } from "vitest"; import type { PostHogAPIClient } from "../../posthog-api"; -import type { TaskRun, TreeSnapshot } from "../../types"; +import type { TaskRun } from "../../types"; const execFileAsync = promisify(execFile); @@ -70,7 +70,7 @@ export function createMockApiClient( return { uploadTaskArtifacts: vi .fn() - .mockResolvedValue([{ storage_path: "gs://bucket/trees/test.tar.gz" }]), + .mockResolvedValue([{ storage_path: "gs://bucket/handoff/test.pack" }]), downloadArtifact: vi.fn(), getTaskRun: vi.fn(), fetchTaskRunLogs: vi.fn(), @@ -97,16 +97,3 @@ export function createTaskRun(overrides: Partial = {}): TaskRun { ...overrides, }; } - -export function createSnapshot( - overrides: Partial = {}, -): TreeSnapshot { - return { - treeHash: "test-tree-hash", - baseCommit: null, - archiveUrl: "gs://bucket/trees/test.tar.gz", - changes: [], - timestamp: new Date().toISOString(), - ...overrides, - }; -} diff --git a/packages/agent/src/tree-tracker.ts b/packages/agent/src/tree-tracker.ts deleted file mode 100644 index 900548f5c..000000000 --- a/packages/agent/src/tree-tracker.ts +++ /dev/null @@ -1,173 +0,0 @@ -/** - * TreeTracker - Git tree-based state capture for cloud/local sync - * - * Captures the entire working state as a git tree hash + archive: - * - Atomic state snapshots (no partial syncs) - * - Efficient delta detection using git's diffing - * - Simpler resume logic (restore tree, continue) - * - * Uses Saga pattern for atomic operations with automatic rollback on failure. - * Uses a temporary git index to avoid modifying the user's staging area. - */ - -import { isCommitOnRemote as gitIsCommitOnRemote } from "@posthog/git/queries"; -import type { PostHogAPIClient } from "./posthog-api"; -import { ApplySnapshotSaga } from "./sagas/apply-snapshot-saga"; -import { CaptureTreeSaga } from "./sagas/capture-tree-saga"; -import type { TreeSnapshot } from "./types"; -import { Logger } from "./utils/logger"; - -export type { TreeSnapshot }; - -export interface TreeTrackerConfig { - repositoryPath: string; - taskId: string; - runId: string; - apiClient?: PostHogAPIClient; - logger?: Logger; -} - -export class TreeTracker { - private repositoryPath: string; - private taskId: string; - private runId: string; - private apiClient?: PostHogAPIClient; - private logger: Logger; - private lastTreeHash: string | null = null; - - constructor(config: TreeTrackerConfig) { - this.repositoryPath = config.repositoryPath; - this.taskId = config.taskId; - this.runId = config.runId; - this.apiClient = config.apiClient; - this.logger = - config.logger || new Logger({ debug: false, prefix: "[TreeTracker]" }); - } - - /** - * Capture current working tree state as a snapshot. - * Uses a temporary index to avoid modifying user's staging area. - * Uses Saga pattern for atomic operation with automatic cleanup on failure. - */ - async captureTree(options?: { - interrupted?: boolean; - }): Promise { - const saga = new CaptureTreeSaga(this.logger); - - const result = await saga.run({ - repositoryPath: this.repositoryPath, - taskId: this.taskId, - runId: this.runId, - apiClient: this.apiClient, - lastTreeHash: this.lastTreeHash, - interrupted: options?.interrupted, - }); - - if (!result.success) { - this.logger.error("Failed to capture tree", { - error: result.error, - failedStep: result.failedStep, - }); - throw new Error( - `Failed to capture tree at step '${result.failedStep}': ${result.error}`, - ); - } - - // Only update lastTreeHash on success - if (result.data.newTreeHash !== null) { - this.lastTreeHash = result.data.newTreeHash; - } - - return result.data.snapshot; - } - - /** - * Download and apply a tree snapshot. - * Uses Saga pattern for atomic operation with rollback on failure. - */ - async applyTreeSnapshot(snapshot: TreeSnapshot): Promise { - if (!this.apiClient) { - throw new Error("Cannot apply snapshot: API client not configured"); - } - - if (!snapshot.archiveUrl) { - this.logger.warn("Cannot apply snapshot: no archive URL", { - treeHash: snapshot.treeHash, - changes: snapshot.changes.length, - }); - throw new Error("Cannot apply snapshot: no archive URL"); - } - - const saga = new ApplySnapshotSaga(this.logger); - - const result = await saga.run({ - snapshot, - repositoryPath: this.repositoryPath, - apiClient: this.apiClient, - taskId: this.taskId, - runId: this.runId, - }); - - if (!result.success) { - this.logger.error("Failed to apply tree snapshot", { - error: result.error, - failedStep: result.failedStep, - treeHash: snapshot.treeHash, - }); - throw new Error( - `Failed to apply snapshot at step '${result.failedStep}': ${result.error}`, - ); - } - - // Only update lastTreeHash on success - this.lastTreeHash = result.data.treeHash; - } - - /** - * Get the last captured tree hash. - */ - getLastTreeHash(): string | null { - return this.lastTreeHash; - } - - /** - * Set the last tree hash (used when resuming). - */ - setLastTreeHash(hash: string | null): void { - this.lastTreeHash = hash; - } -} - -/** - * Check if a commit is available on any remote branch. - * Used to validate that cloud can fetch the base commit during handoff. - */ -export async function isCommitOnRemote( - commit: string, - cwd: string, -): Promise { - return gitIsCommitOnRemote(cwd, commit); -} - -/** - * Validate that a snapshot can be handed off to cloud execution. - * Cloud needs to be able to fetch the baseCommit from a remote. - * - * @throws Error if the snapshot cannot be restored on cloud - */ -export async function validateForCloudHandoff( - snapshot: TreeSnapshot, - repositoryPath: string, -): Promise { - if (!snapshot.baseCommit) { - throw new Error("Cannot hand off to cloud: no base commit"); - } - - const onRemote = await isCommitOnRemote(snapshot.baseCommit, repositoryPath); - if (!onRemote) { - throw new Error( - `Cannot hand off to cloud: commit ${snapshot.baseCommit.slice(0, 7)} is not pushed. ` + - `Run 'git push' to push your branch first.`, - ); - } -} diff --git a/packages/agent/src/types.ts b/packages/agent/src/types.ts index 2d1990895..34ce73227 100644 --- a/packages/agent/src/types.ts +++ b/packages/agent/src/types.ts @@ -60,8 +60,7 @@ export type ArtifactType = | "context" | "reference" | "output" - | "artifact" - | "tree_snapshot"; + | "artifact"; export interface TaskRunArtifact { name: string; @@ -178,21 +177,6 @@ export interface FileChange { status: FileStatus; } -// Tree snapshot - what TreeTracker captures -export interface TreeSnapshot { - treeHash: string; - baseCommit: string | null; - archiveUrl?: string; - changes: FileChange[]; - timestamp: string; - interrupted?: boolean; -} - -// Tree snapshot event - includes device info when sent as notification -export interface TreeSnapshotEvent extends TreeSnapshot { - device?: DeviceInfo; -} - export type HandoffLocalGitState = GitHandoffLocalGitState; export interface GitCheckpoint extends GitHandoffCheckpoint { diff --git a/packages/agent/tsup.config.ts b/packages/agent/tsup.config.ts index 88ed0ceee..40aa308df 100644 --- a/packages/agent/tsup.config.ts +++ b/packages/agent/tsup.config.ts @@ -77,7 +77,6 @@ export default defineConfig([ "src/handoff-checkpoint.ts", "src/posthog-api.ts", "src/resume.ts", - "src/tree-tracker.ts", "src/types.ts", "src/adapters/claude/questions/utils.ts", "src/adapters/claude/permissions/permission-options.ts", diff --git a/packages/git/package.json b/packages/git/package.json index df1ccf0a6..20f80840f 100644 --- a/packages/git/package.json +++ b/packages/git/package.json @@ -27,7 +27,6 @@ ], "dependencies": { "@posthog/shared": "workspace:*", - "simple-git": "^3.30.0", - "tar": "^7.5.6" + "simple-git": "^3.30.0" } } diff --git a/packages/git/src/handoff.test.ts b/packages/git/src/handoff.test.ts index 99c5c5c02..e07904916 100644 --- a/packages/git/src/handoff.test.ts +++ b/packages/git/src/handoff.test.ts @@ -102,21 +102,6 @@ async function makeCloudChanges( await writeFile(path.join(cloudRepo, "untracked.txt"), "untracked\n"); } -async function mirrorWorktreeFiles( - fromRepo: string, - toRepo: string, - files: string[], -): Promise { - await Promise.all( - files.map(async (file) => { - await writeFile( - path.join(toRepo, file), - await readFile(path.join(fromRepo, file), "utf-8"), - ); - }), - ); -} - async function cleanupCapture(capture: GitHandoffCaptureResult): Promise { if (capture.headPack?.path) { await rm(capture.headPack.path, { force: true }).catch(() => {}); @@ -160,18 +145,12 @@ async function captureAndApply( } describe("GitHandoffTracker", () => { - it("captures and reapplies head and index state from local files", async () => { + it("captures and reapplies head, worktree, and index state from local files", async () => { await withRepos(async (repos) => { await makeCloudChanges(repos.cloudRepo, repos.cloudGit); const capture = await captureAndApply(repos); try { - await mirrorWorktreeFiles(repos.cloudRepo, repos.localRepo, [ - "tracked.txt", - "unstaged.txt", - "untracked.txt", - ]); - expect((await repos.localGit.revparse(["HEAD"])).trim()).toBe( capture.checkpoint.head, ); @@ -181,6 +160,15 @@ describe("GitHandoffTracker", () => { expect( await readFile(path.join(repos.localRepo, "committed.txt"), "utf-8"), ).toBe("cloud commit\n"); + expect( + await readFile(path.join(repos.localRepo, "tracked.txt"), "utf-8"), + ).toBe("staged change\n"); + expect( + await readFile(path.join(repos.localRepo, "unstaged.txt"), "utf-8"), + ).toBe("unstaged change\n"); + expect( + await readFile(path.join(repos.localRepo, "untracked.txt"), "utf-8"), + ).toBe("untracked\n"); const status = await repos.localGit.raw(["status", "--porcelain"]); expect(status).toContain("M tracked.txt"); @@ -192,6 +180,27 @@ describe("GitHandoffTracker", () => { }); }, 15000); + it("removes tracked files absent from the checkpoint worktree", async () => { + await withRepos(async (repos) => { + await rm(path.join(repos.cloudRepo, "tracked.txt")); + await repos.cloudGit.raw(["rm", "--cached", "tracked.txt"]); + await repos.cloudGit.commit("Remove tracked file"); + + const capture = await captureAndApply(repos); + + try { + await expect( + readFile(path.join(repos.localRepo, "tracked.txt"), "utf-8"), + ).rejects.toThrow(); + + const status = await repos.localGit.raw(["status", "--porcelain"]); + expect(status).not.toContain("tracked.txt"); + } finally { + await cleanupCapture(capture); + } + }); + }, 15000); + it("prompts before resetting a diverged local branch", async () => { await withRepos(async (repos) => { await writeFile( diff --git a/packages/git/src/handoff.ts b/packages/git/src/handoff.ts index a01343c31..5626dca40 100644 --- a/packages/git/src/handoff.ts +++ b/packages/git/src/handoff.ts @@ -1,5 +1,6 @@ import { spawn } from "node:child_process"; -import { copyFile, mkdir, readFile, rm, stat } from "node:fs/promises"; +import { copyFile, mkdtemp, readFile, rm, stat } from "node:fs/promises"; +import { tmpdir } from "node:os"; import path from "node:path"; import type { SagaLogger } from "@posthog/shared"; import { createGitClient, type GitClient } from "./client"; @@ -92,7 +93,7 @@ export class GitHandoffTracker { } async captureForHandoff( - localGitState?: HandoffLocalGitState, + _localGitState?: HandoffLocalGitState, ): Promise { const captureSaga = new CaptureCheckpointSaga(this.logger); const result = await captureSaga.run({ baseDir: this.repositoryPath }); @@ -104,21 +105,22 @@ export class GitHandoffTracker { const checkpoint = result.data; const git = createGitClient(this.repositoryPath); - const tempDir = await this.getTempDir(git); + const tempDir = await this.createTempDir(checkpoint.checkpointId); const checkpointRef = `${CHECKPOINT_REF_PREFIX}${checkpoint.checkpointId}`; - const shouldIncludeHead = - !!checkpoint.head && checkpoint.head !== localGitState?.head; - const headRef = shouldIncludeHead + const packRefs = [ + checkpoint.head, + checkpoint.indexTree, + checkpoint.worktreeTree, + ].filter((ref): ref is string => !!ref); + const headRef = checkpoint.head ? `${HANDOFF_HEAD_REF_PREFIX}${checkpoint.checkpointId}` : undefined; const packPrefix = path.join(tempDir, checkpoint.checkpointId); try { const [headPack, indexFile, tracking] = await Promise.all([ - shouldIncludeHead && checkpoint.head - ? this.captureHeadPack(packPrefix, checkpoint.head) - : Promise.resolve(undefined), - this.copyIndexFile(git, checkpoint.checkpointId), + this.captureObjectPack(packPrefix, packRefs), + this.copyIndexFile(git, checkpoint.checkpointId, tempDir), getTrackingMetadata(git, checkpoint.branch), ]); @@ -190,6 +192,9 @@ export class GitHandoffTracker { await git.checkout(checkpoint.head); } + await git.clean(["f", "d"]); + await git.raw(["read-tree", "--reset", "-u", checkpoint.worktreeTree]); + if (indexPath) { await this.restoreIndexFile(git, indexPath); } @@ -204,13 +209,13 @@ export class GitHandoffTracker { }; } - private async captureHeadPack( + private async captureObjectPack( packPrefix: string, - headCommit: string, + refs: string[], ): Promise { const hash = await this.runGitWithInput( ["pack-objects", packPrefix, "--revs"], - `${headCommit}\n`, + `${refs.join("\n")}\n`, ); const packPath = `${packPrefix}-${hash.trim()}.pack`; const rawBytes = await this.getFileSize(packPath); @@ -221,9 +226,9 @@ export class GitHandoffTracker { private async copyIndexFile( git: GitClient, checkpointId: string, + tempDir: string, ): Promise { const indexPath = await this.getGitPath(git, "index"); - const tempDir = await this.getTempDir(git); const copiedIndexPath = path.join(tempDir, `${checkpointId}.index`); await copyFile(indexPath, copiedIndexPath); return { @@ -422,15 +427,8 @@ export class GitHandoffTracker { return exitCode === 0; } - private async getTempDir(git: GitClient): Promise { - const raw = await git.raw(["rev-parse", "--git-common-dir"]); - const commonDir = raw.trim() || ".git"; - const resolved = path.isAbsolute(commonDir) - ? commonDir - : path.resolve(this.repositoryPath, commonDir); - const tempDir = path.join(resolved, "posthog-code-tmp"); - await mkdir(tempDir, { recursive: true }); - return tempDir; + private async createTempDir(checkpointId: string): Promise { + return mkdtemp(joinTempPrefix(checkpointId)); } private async getGitPath(git: GitClient, gitPath: string): Promise { @@ -522,6 +520,10 @@ export class GitHandoffTracker { } } +function joinTempPrefix(checkpointId: string): string { + return path.join(tmpdir(), `posthog-code-handoff-${checkpointId}-`); +} + export async function readHandoffLocalGitState( repositoryPath: string, ): Promise { diff --git a/packages/git/src/sagas/tree.ts b/packages/git/src/sagas/tree.ts deleted file mode 100644 index 9e1fd745a..000000000 --- a/packages/git/src/sagas/tree.ts +++ /dev/null @@ -1,403 +0,0 @@ -import { existsSync } from "node:fs"; -import * as fs from "node:fs/promises"; -import * as path from "node:path"; -import * as tar from "tar"; -import type { GitClient } from "../client"; -import { GitSaga, type GitSagaInput } from "../git-saga"; -import { getHeadSha } from "../queries"; - -export type FileStatus = "A" | "M" | "D"; - -export interface FileChange { - path: string; - status: FileStatus; -} - -export interface TreeSnapshot { - treeHash: string; - baseCommit: string | null; - changes: FileChange[]; - timestamp: string; -} - -export interface CaptureTreeInput extends GitSagaInput { - lastTreeHash?: string | null; - archivePath?: string; -} - -export interface CaptureTreeOutput { - snapshot: TreeSnapshot | null; - archivePath?: string; - changed: boolean; -} - -export class CaptureTreeSaga extends GitSaga< - CaptureTreeInput, - CaptureTreeOutput -> { - readonly sagaName = "CaptureTreeSaga"; - private tempIndexPath: string | null = null; - - protected async executeGitOperations( - input: CaptureTreeInput, - ): Promise { - const { baseDir, lastTreeHash, archivePath, signal } = input; - const tmpDir = path.join(baseDir, ".git", "posthog-code-tmp"); - - await this.step({ - name: "create_tmp_dir", - execute: () => fs.mkdir(tmpDir, { recursive: true }), - rollback: async () => {}, - }); - - this.tempIndexPath = path.join(tmpDir, `index-${Date.now()}`); - const tempIndexGit = this.git.env({ - ...process.env, - GIT_INDEX_FILE: this.tempIndexPath, - }); - - await this.step({ - name: "init_temp_index", - execute: () => tempIndexGit.raw(["read-tree", "HEAD"]), - rollback: async () => { - if (this.tempIndexPath) { - await fs.rm(this.tempIndexPath, { force: true }).catch(() => {}); - } - }, - }); - - await this.readOnlyStep("stage_files", () => - tempIndexGit.raw(["add", "-A"]), - ); - - const treeHash = await this.readOnlyStep("write_tree", () => - tempIndexGit.raw(["write-tree"]), - ); - - if (lastTreeHash && treeHash === lastTreeHash) { - this.log.debug("No changes since last capture", { treeHash }); - await fs.rm(this.tempIndexPath, { force: true }).catch(() => {}); - return { snapshot: null, changed: false }; - } - - const baseCommit = await this.readOnlyStep("get_base_commit", async () => { - try { - return await getHeadSha(baseDir, { abortSignal: signal }); - } catch { - return null; - } - }); - - const changes = await this.readOnlyStep("get_changes", () => - this.getChanges(this.git, baseCommit, treeHash), - ); - - await fs.rm(this.tempIndexPath, { force: true }).catch(() => {}); - - const snapshot: TreeSnapshot = { - treeHash, - baseCommit, - changes, - timestamp: new Date().toISOString(), - }; - - let createdArchivePath: string | undefined; - if (archivePath) { - createdArchivePath = await this.createArchive( - baseDir, - archivePath, - changes, - ); - } - - this.log.info("Tree captured", { - treeHash, - changes: changes.length, - archived: !!createdArchivePath, - }); - - return { snapshot, archivePath: createdArchivePath, changed: true }; - } - - private async createArchive( - baseDir: string, - archivePath: string, - changes: FileChange[], - ): Promise { - const filesToArchive = changes - .filter((c) => c.status !== "D") - .map((c) => c.path); - - if (filesToArchive.length === 0) { - return undefined; - } - - const existingFiles = filesToArchive.filter((f) => - existsSync(path.join(baseDir, f)), - ); - - if (existingFiles.length === 0) { - return undefined; - } - - await this.step({ - name: "create_archive", - execute: async () => { - const archiveDir = path.dirname(archivePath); - await fs.mkdir(archiveDir, { recursive: true }); - await tar.create( - { - gzip: true, - file: archivePath, - cwd: baseDir, - }, - existingFiles, - ); - }, - rollback: async () => { - await fs.rm(archivePath, { force: true }).catch(() => {}); - }, - }); - - return archivePath; - } - - private async getChanges( - git: GitClient, - fromRef: string | null, - toRef: string, - ): Promise { - if (!fromRef) { - const stdout = await git.raw(["ls-tree", "-r", "--name-only", toRef]); - return stdout - .split("\n") - .filter((p) => p.trim()) - .map((p) => ({ path: p, status: "A" as FileStatus })); - } - - const stdout = await git.raw([ - "diff-tree", - "-r", - "--name-status", - fromRef, - toRef, - ]); - - const changes: FileChange[] = []; - for (const line of stdout.split("\n")) { - if (!line.trim()) continue; - const [status, filePath] = line.split("\t"); - if (!filePath) continue; - - let normalizedStatus: FileStatus; - if (status === "D") { - normalizedStatus = "D"; - } else if (status === "A") { - normalizedStatus = "A"; - } else { - normalizedStatus = "M"; - } - - changes.push({ path: filePath, status: normalizedStatus }); - } - - return changes; - } -} - -export interface ApplyTreeInput extends GitSagaInput { - treeHash: string; - baseCommit?: string | null; - changes: FileChange[]; - archivePath?: string; -} - -export interface ApplyTreeOutput { - treeHash: string; - checkoutPerformed: boolean; -} - -export class ApplyTreeSaga extends GitSaga { - readonly sagaName = "ApplyTreeSaga"; - private originalHead: string | null = null; - private originalBranch: string | null = null; - private extractedFiles: string[] = []; - private fileBackups: Map = new Map(); - - protected async executeGitOperations( - input: ApplyTreeInput, - ): Promise { - const { baseDir, treeHash, baseCommit, changes, archivePath } = input; - - const headInfo = await this.readOnlyStep("get_current_head", async () => { - let head: string | null = null; - let branch: string | null = null; - - try { - head = await this.git.revparse(["HEAD"]); - } catch { - head = null; - } - - try { - branch = await this.git.raw(["symbolic-ref", "--short", "HEAD"]); - } catch { - branch = null; - } - - return { head, branch }; - }); - this.originalHead = headInfo.head; - this.originalBranch = headInfo.branch; - - let checkoutPerformed = false; - - if (baseCommit && baseCommit !== this.originalHead) { - await this.readOnlyStep("check_working_tree", async () => { - const status = await this.git.status(); - if (!status.isClean()) { - const changedFiles = - status.modified.length + - status.staged.length + - status.deleted.length; - throw new Error( - `Cannot apply tree: ${changedFiles} uncommitted change(s) exist. ` + - `Commit or stash your changes first.`, - ); - } - }); - - await this.step({ - name: "checkout_base", - execute: async () => { - await this.git.checkout(baseCommit); - checkoutPerformed = true; - this.log.warn( - "Applied tree from different commit - now in detached HEAD state", - { - originalHead: this.originalHead, - originalBranch: this.originalBranch, - baseCommit, - }, - ); - }, - rollback: async () => { - try { - if (this.originalBranch) { - await this.git.checkout(this.originalBranch); - } else if (this.originalHead) { - await this.git.checkout(this.originalHead); - } - } catch (error) { - this.log.warn("Failed to rollback checkout", { error }); - } - }, - }); - } - - if (archivePath) { - const filesToExtract = changes - .filter((c) => c.status !== "D") - .map((c) => c.path); - - await this.readOnlyStep("backup_existing_files", async () => { - for (const filePath of filesToExtract) { - const fullPath = path.join(baseDir, filePath); - try { - const content = await fs.readFile(fullPath); - this.fileBackups.set(filePath, content); - } catch {} - } - }); - - await this.step({ - name: "extract_archive", - execute: async () => { - await tar.extract({ - file: archivePath, - cwd: baseDir, - }); - this.extractedFiles = filesToExtract; - }, - rollback: async () => { - for (const filePath of this.extractedFiles) { - const fullPath = path.join(baseDir, filePath); - const backup = this.fileBackups.get(filePath); - if (backup) { - const dir = path.dirname(fullPath); - await fs.mkdir(dir, { recursive: true }).catch(() => {}); - await fs.writeFile(fullPath, backup).catch(() => {}); - } else { - await fs.rm(fullPath, { force: true }).catch(() => {}); - } - } - }, - }); - } - - for (const change of changes.filter((c) => c.status === "D")) { - const fullPath = path.join(baseDir, change.path); - - const backupContent = await this.readOnlyStep( - `backup_${change.path}`, - async () => { - try { - return await fs.readFile(fullPath); - } catch { - return null; - } - }, - ); - - await this.step({ - name: `delete_${change.path}`, - execute: async () => { - await fs.rm(fullPath, { force: true }); - this.log.debug(`Deleted file: ${change.path}`); - }, - rollback: async () => { - if (backupContent) { - const dir = path.dirname(fullPath); - await fs.mkdir(dir, { recursive: true }).catch(() => {}); - await fs.writeFile(fullPath, backupContent).catch(() => {}); - } - }, - }); - } - - const deletedCount = changes.filter((c) => c.status === "D").length; - this.log.info("Tree applied", { - treeHash, - totalChanges: changes.length, - deletedFiles: deletedCount, - checkoutPerformed, - }); - - return { treeHash, checkoutPerformed }; - } -} - -export interface ReadTreeInput extends GitSagaInput { - treeHash: string; -} - -export interface ReadTreeOutput { - files: string[]; -} - -export class ReadTreeSaga extends GitSaga { - readonly sagaName = "ReadTreeSaga"; - - protected async executeGitOperations( - input: ReadTreeInput, - ): Promise { - const { treeHash } = input; - - const stdout = await this.readOnlyStep("ls_tree", () => - this.git.raw(["ls-tree", "-r", "--name-only", treeHash]), - ); - - const files = stdout.split("\n").filter((f) => f.trim()); - return { files }; - } -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4b2ed4a97..44ad05cd8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -779,9 +779,6 @@ importers: simple-git: specifier: ^3.30.0 version: 3.30.0 - tar: - specifier: ^7.5.6 - version: 7.5.7 devDependencies: '@types/tar': specifier: ^6.1.13 @@ -21121,7 +21118,7 @@ snapshots: path-scurry@1.11.1: dependencies: lru-cache: 10.4.3 - minipass: 7.1.2 + minipass: 7.1.3 path-scurry@2.0.1: dependencies: