diff --git a/apps/server/src/git/Layers/GitCore.ts b/apps/server/src/git/Layers/GitCore.ts index f5b9168abb..81f24ad768 100644 --- a/apps/server/src/git/Layers/GitCore.ts +++ b/apps/server/src/git/Layers/GitCore.ts @@ -2,7 +2,8 @@ import { Cache, Data, Duration, Effect, Exit, FileSystem, Layer, Path } from "ef import { GitCommandError } from "../Errors.ts"; import { GitService } from "../Services/GitService.ts"; -import { GitCore, type GitCoreShape } from "../Services/GitCore.ts"; +import type { ExecuteGitProgress } from "../Services/GitService.ts"; +import { GitCore, type GitCommitOptions, type GitCoreShape } from "../Services/GitCore.ts"; const STATUS_UPSTREAM_REFRESH_INTERVAL = Duration.seconds(15); const STATUS_UPSTREAM_REFRESH_TIMEOUT = Duration.seconds(5); @@ -20,6 +21,7 @@ interface ExecuteGitOptions { timeoutMs?: number | undefined; allowNonZeroExit?: boolean | undefined; fallbackErrorMessage?: string | undefined; + progress?: ExecuteGitProgress | undefined; } function parseBranchAb(value: string): { ahead: number; behind: number } { @@ -235,6 +237,7 @@ const makeGitCore = Effect.gen(function* () { args, allowNonZeroExit: true, ...(options.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}), + ...(options.progress ? { progress: options.progress } : {}), }) .pipe( Effect.flatMap((result) => { @@ -804,14 +807,37 @@ const makeGitCore = Effect.gen(function* () { }; }); - const commit: GitCoreShape["commit"] = (cwd, subject, body) => + const commit: GitCoreShape["commit"] = (cwd, subject, body, options?: GitCommitOptions) => Effect.gen(function* () { const args = ["commit", "-m", subject]; const trimmedBody = body.trim(); if (trimmedBody.length > 0) { args.push("-m", trimmedBody); } - yield* runGit("GitCore.commit.commit", cwd, args); + const progress = options?.progress + ? { + ...(options.progress.onOutputLine + ? { + onStdoutLine: (line: string) => + options.progress?.onOutputLine?.({ stream: "stdout", text: line }) ?? + Effect.void, + onStderrLine: (line: string) => + options.progress?.onOutputLine?.({ stream: "stderr", text: line }) ?? + Effect.void, + } + : {}), + ...(options.progress.onHookStarted + ? { onHookStarted: options.progress.onHookStarted } + : {}), + ...(options.progress.onHookFinished + ? { onHookFinished: options.progress.onHookFinished } + : {}), + } + : null; + yield* executeGit("GitCore.commit.commit", cwd, args, { + ...(options?.timeoutMs !== undefined ? { timeoutMs: options.timeoutMs } : {}), + ...(progress ? { progress } : {}), + }).pipe(Effect.asVoid); const commitSha = yield* runGitStdout("GitCore.commit.revParseHead", cwd, [ "rev-parse", "HEAD", diff --git a/apps/server/src/git/Layers/GitManager.test.ts b/apps/server/src/git/Layers/GitManager.test.ts index 8c72941cd0..515661ec31 100644 --- a/apps/server/src/git/Layers/GitManager.test.ts +++ b/apps/server/src/git/Layers/GitManager.test.ts @@ -6,6 +6,7 @@ import * as NodeServices from "@effect/platform-node/NodeServices"; import { it } from "@effect/vitest"; import { Effect, FileSystem, Layer, PlatformError, Scope } from "effect"; import { expect } from "vitest"; +import type { GitActionProgressEvent } from "@t3tools/contracts"; import { GitCommandError, GitHubCliError, TextGenerationError } from "../Errors.ts"; import { type GitManagerShape } from "../Services/GitManager.ts"; @@ -449,12 +450,20 @@ function runStackedAction( input: { cwd: string; action: "commit" | "commit_push" | "commit_push_pr"; + actionId?: string; commitMessage?: string; featureBranch?: boolean; filePaths?: readonly string[]; }, + options?: Parameters[1], ) { - return manager.runStackedAction(input); + return manager.runStackedAction( + { + ...input, + actionId: input.actionId ?? "test-action-id", + }, + options, + ); } function resolvePullRequest(manager: GitManagerShape, input: { cwd: string; reference: string }) { @@ -1936,4 +1945,117 @@ it.layer(GitManagerTestLayer)("GitManager", (it) => { expect(errorMessage).toContain("already checked out in the main repo"); }), ); + + it.effect("emits ordered progress events for commit hooks", () => + Effect.gen(function* () { + const repoDir = yield* makeTempDir("t3code-git-manager-"); + yield* initRepo(repoDir); + fs.writeFileSync(path.join(repoDir, "hooked.txt"), "hooked\n"); + fs.writeFileSync( + path.join(repoDir, ".git", "hooks", "pre-commit"), + '#!/bin/sh\necho "hook: start" >&2\nsleep 1\necho "hook: end" >&2\n', + { mode: 0o755 }, + ); + + const { manager } = yield* makeManager(); + const events: GitActionProgressEvent[] = []; + + const result = yield* runStackedAction( + manager, + { + cwd: repoDir, + action: "commit", + }, + { + actionId: "action-1", + progressReporter: { + publish: (event) => + Effect.sync(() => { + events.push(event); + }), + }, + }, + ); + + expect(result.commit.status).toBe("created"); + expect(events.map((event) => event.kind)).toContain("action_started"); + expect(events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + kind: "phase_started", + phase: "commit", + }), + expect.objectContaining({ + kind: "hook_started", + hookName: "pre-commit", + }), + expect.objectContaining({ + kind: "hook_output", + text: "hook: start", + }), + expect.objectContaining({ + kind: "hook_output", + text: "hook: end", + }), + expect.objectContaining({ + kind: "hook_finished", + hookName: "pre-commit", + }), + expect.objectContaining({ + kind: "action_finished", + }), + ]), + ); + }), + ); + + it.effect("emits action_failed when a commit hook rejects", () => + Effect.gen(function* () { + const repoDir = yield* makeTempDir("t3code-git-manager-"); + yield* initRepo(repoDir); + fs.writeFileSync(path.join(repoDir, "hook-failure.txt"), "broken\n"); + fs.writeFileSync( + path.join(repoDir, ".git", "hooks", "pre-commit"), + '#!/bin/sh\necho "hook: fail" >&2\nexit 1\n', + { mode: 0o755 }, + ); + + const { manager } = yield* makeManager(); + const events: GitActionProgressEvent[] = []; + + const errorMessage = yield* runStackedAction( + manager, + { + cwd: repoDir, + action: "commit", + }, + { + actionId: "action-2", + progressReporter: { + publish: (event) => + Effect.sync(() => { + events.push(event); + }), + }, + }, + ).pipe( + Effect.flip, + Effect.map((error) => error.message), + ); + + expect(errorMessage).toContain("hook: fail"); + expect(events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + kind: "hook_started", + hookName: "pre-commit", + }), + expect.objectContaining({ + kind: "action_failed", + phase: "commit", + }), + ]), + ); + }), + ); }); diff --git a/apps/server/src/git/Layers/GitManager.ts b/apps/server/src/git/Layers/GitManager.ts index 1a3cf2bb35..4f240e0044 100644 --- a/apps/server/src/git/Layers/GitManager.ts +++ b/apps/server/src/git/Layers/GitManager.ts @@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto"; import { realpathSync } from "node:fs"; import { Effect, FileSystem, Layer, Path } from "effect"; +import type { GitActionProgressEvent, GitActionProgressPhase } from "@t3tools/contracts"; import { resolveAutoFeatureBranchName, sanitizeBranchFragment, @@ -9,11 +10,21 @@ import { } from "@t3tools/shared/git"; import { GitManagerError } from "../Errors.ts"; -import { GitManager, type GitManagerShape } from "../Services/GitManager.ts"; +import { + GitManager, + type GitActionProgressReporter, + type GitManagerShape, + type GitRunStackedActionOptions, +} from "../Services/GitManager.ts"; import { GitCore } from "../Services/GitCore.ts"; import { GitHubCli } from "../Services/GitHubCli.ts"; import { TextGeneration } from "../Services/TextGeneration.ts"; +const COMMIT_TIMEOUT_MS = 10 * 60_000; +const MAX_PROGRESS_TEXT_LENGTH = 500; +type StripProgressContext = T extends any ? Omit : never; +type GitActionProgressPayload = StripProgressContext; + interface OpenPrInfo { number: number; title: string; @@ -200,6 +211,17 @@ function sanitizeCommitMessage(generated: { }; } +function sanitizeProgressText(value: string): string | null { + const trimmed = value.trim(); + if (trimmed.length === 0) { + return null; + } + if (trimmed.length <= MAX_PROGRESS_TEXT_LENGTH) { + return trimmed; + } + return trimmed.slice(0, MAX_PROGRESS_TEXT_LENGTH).trimEnd(); +} + interface CommitAndBranchSuggestion { subject: string; body: string; @@ -337,6 +359,29 @@ export const makeGitManager = Effect.gen(function* () { const gitHubCli = yield* GitHubCli; const textGeneration = yield* TextGeneration; + const createProgressEmitter = ( + input: { cwd: string; action: "commit" | "commit_push" | "commit_push_pr" }, + options?: GitRunStackedActionOptions, + ) => { + const actionId = options?.actionId ?? randomUUID(); + const reporter = options?.progressReporter; + + const emit = (event: GitActionProgressPayload) => + reporter + ? reporter.publish({ + actionId, + cwd: input.cwd, + action: input.action, + ...event, + } as GitActionProgressEvent) + : Effect.void; + + return { + actionId, + emit, + }; + }; + const configurePullRequestHeadUpstream = ( cwd: string, pullRequest: ResolvedPullRequest & PullRequestHeadRemoteInfo, @@ -680,27 +725,111 @@ export const makeGitManager = Effect.gen(function* () { const runCommitStep = ( cwd: string, + action: "commit" | "commit_push" | "commit_push_pr", branch: string | null, commitMessage?: string, preResolvedSuggestion?: CommitAndBranchSuggestion, filePaths?: readonly string[], model?: string, + progressReporter?: GitActionProgressReporter, + actionId?: string, ) => Effect.gen(function* () { - const suggestion = - preResolvedSuggestion ?? - (yield* resolveCommitAndBranchSuggestion({ + const emit = (event: GitActionProgressPayload) => + progressReporter && actionId + ? progressReporter.publish({ + actionId, + cwd, + action, + ...event, + } as GitActionProgressEvent) + : Effect.void; + + let suggestion: CommitAndBranchSuggestion | null | undefined = preResolvedSuggestion; + if (!suggestion) { + const needsGeneration = !commitMessage?.trim(); + if (needsGeneration) { + yield* emit({ + kind: "phase_started", + phase: "commit", + label: "Generating commit message...", + }); + } + suggestion = yield* resolveCommitAndBranchSuggestion({ cwd, branch, ...(commitMessage ? { commitMessage } : {}), ...(filePaths ? { filePaths } : {}), ...(model ? { model } : {}), - })); + }); + } if (!suggestion) { return { status: "skipped_no_changes" as const }; } - const { commitSha } = yield* gitCore.commit(cwd, suggestion.subject, suggestion.body); + yield* emit({ + kind: "phase_started", + phase: "commit", + label: "Committing...", + }); + + let currentHookName: string | null = null; + const commitProgress = + progressReporter && actionId + ? { + onOutputLine: ({ stream, text }: { stream: "stdout" | "stderr"; text: string }) => { + const sanitized = sanitizeProgressText(text); + if (!sanitized) { + return Effect.void; + } + return emit({ + kind: "hook_output", + hookName: currentHookName, + stream, + text: sanitized, + }); + }, + onHookStarted: (hookName: string) => { + currentHookName = hookName; + return emit({ + kind: "hook_started", + hookName, + }); + }, + onHookFinished: ({ + hookName, + exitCode, + durationMs, + }: { + hookName: string; + exitCode: number | null; + durationMs: number | null; + }) => { + if (currentHookName === hookName) { + currentHookName = null; + } + return emit({ + kind: "hook_finished", + hookName, + exitCode, + durationMs, + }); + }, + } + : null; + const { commitSha } = yield* gitCore.commit(cwd, suggestion.subject, suggestion.body, { + timeoutMs: COMMIT_TIMEOUT_MS, + ...(commitProgress ? { progress: commitProgress } : {}), + }); + if (currentHookName !== null) { + yield* emit({ + kind: "hook_finished", + hookName: currentHookName, + exitCode: 0, + durationMs: null, + }); + currentHookName = null; + } return { status: "created" as const, commitSha, @@ -1010,66 +1139,135 @@ export const makeGitManager = Effect.gen(function* () { }); const runStackedAction: GitManagerShape["runStackedAction"] = Effect.fnUntraced( - function* (input) { - const wantsPush = input.action !== "commit"; - const wantsPr = input.action === "commit_push_pr"; + function* (input, options) { + const progress = createProgressEmitter(input, options); + const phases: GitActionProgressPhase[] = [ + ...(input.featureBranch ? (["branch"] as const) : []), + "commit", + ...(input.action !== "commit" ? (["push"] as const) : []), + ...(input.action === "commit_push_pr" ? (["pr"] as const) : []), + ]; + let currentPhase: GitActionProgressPhase | null = null; + + const runAction = Effect.gen(function* () { + yield* progress.emit({ + kind: "action_started", + phases, + }); - const initialStatus = yield* gitCore.statusDetails(input.cwd); - if (!input.featureBranch && wantsPush && !initialStatus.branch) { - return yield* gitManagerError("runStackedAction", "Cannot push from detached HEAD."); - } - if (!input.featureBranch && wantsPr && !initialStatus.branch) { - return yield* gitManagerError( - "runStackedAction", - "Cannot create a pull request from detached HEAD.", - ); - } + const wantsPush = input.action !== "commit"; + const wantsPr = input.action === "commit_push_pr"; + + const initialStatus = yield* gitCore.statusDetails(input.cwd); + if (!input.featureBranch && wantsPush && !initialStatus.branch) { + return yield* gitManagerError("runStackedAction", "Cannot push from detached HEAD."); + } + if (!input.featureBranch && wantsPr && !initialStatus.branch) { + return yield* gitManagerError( + "runStackedAction", + "Cannot create a pull request from detached HEAD.", + ); + } + + let branchStep: { status: "created" | "skipped_not_requested"; name?: string }; + let commitMessageForStep = input.commitMessage; + let preResolvedCommitSuggestion: CommitAndBranchSuggestion | undefined = undefined; + + if (input.featureBranch) { + currentPhase = "branch"; + yield* progress.emit({ + kind: "phase_started", + phase: "branch", + label: "Preparing feature branch...", + }); + const result = yield* runFeatureBranchStep( + input.cwd, + initialStatus.branch, + input.commitMessage, + input.filePaths, + input.textGenerationModel, + ); + branchStep = result.branchStep; + commitMessageForStep = result.resolvedCommitMessage; + preResolvedCommitSuggestion = result.resolvedCommitSuggestion; + } else { + branchStep = { status: "skipped_not_requested" as const }; + } - let branchStep: { status: "created" | "skipped_not_requested"; name?: string }; - let commitMessageForStep = input.commitMessage; - let preResolvedCommitSuggestion: CommitAndBranchSuggestion | undefined = undefined; + const currentBranch = branchStep.name ?? initialStatus.branch; - if (input.featureBranch) { - const result = yield* runFeatureBranchStep( + currentPhase = "commit"; + const commit = yield* runCommitStep( input.cwd, - initialStatus.branch, - input.commitMessage, + input.action, + currentBranch, + commitMessageForStep, + preResolvedCommitSuggestion, input.filePaths, input.textGenerationModel, + options?.progressReporter, + progress.actionId, ); - branchStep = result.branchStep; - commitMessageForStep = result.resolvedCommitMessage; - preResolvedCommitSuggestion = result.resolvedCommitSuggestion; - } else { - branchStep = { status: "skipped_not_requested" as const }; - } - const currentBranch = branchStep.name ?? initialStatus.branch; + const push = wantsPush + ? yield* progress + .emit({ + kind: "phase_started", + phase: "push", + label: "Pushing...", + }) + .pipe( + Effect.flatMap(() => + Effect.gen(function* () { + currentPhase = "push"; + return yield* gitCore.pushCurrentBranch(input.cwd, currentBranch); + }), + ), + ) + : { status: "skipped_not_requested" as const }; + + const pr = wantsPr + ? yield* progress + .emit({ + kind: "phase_started", + phase: "pr", + label: "Creating PR...", + }) + .pipe( + Effect.flatMap(() => + Effect.gen(function* () { + currentPhase = "pr"; + return yield* runPrStep(input.cwd, currentBranch, input.textGenerationModel); + }), + ), + ) + : { status: "skipped_not_requested" as const }; + + const result = { + action: input.action, + branch: branchStep, + commit, + push, + pr, + }; + yield* progress.emit({ + kind: "action_finished", + result, + }); + return result; + }); - const commit = yield* runCommitStep( - input.cwd, - currentBranch, - commitMessageForStep, - preResolvedCommitSuggestion, - input.filePaths, - input.textGenerationModel, + return yield* runAction.pipe( + Effect.catch((error) => + progress + .emit({ + kind: "action_failed", + phase: currentPhase, + message: error.message, + }) + .pipe(Effect.flatMap(() => Effect.fail(error))), + ), ); - - const push = wantsPush - ? yield* gitCore.pushCurrentBranch(input.cwd, currentBranch) - : { status: "skipped_not_requested" as const }; - - const pr = wantsPr - ? yield* runPrStep(input.cwd, currentBranch, input.textGenerationModel) - : { status: "skipped_not_requested" as const }; - - return { - action: input.action, - branch: branchStep, - commit, - push, - pr, - }; }, ); diff --git a/apps/server/src/git/Layers/GitService.ts b/apps/server/src/git/Layers/GitService.ts index d3f07e3151..6d233cc959 100644 --- a/apps/server/src/git/Layers/GitService.ts +++ b/apps/server/src/git/Layers/GitService.ts @@ -6,18 +6,25 @@ * * @module GitServiceLive */ -import { Effect, Layer, Option, Schema, Stream } from "effect"; +import { randomUUID } from "node:crypto"; +import { tmpdir } from "node:os"; + +import { Effect, FileSystem, Layer, Option, Path, Schema, Stream } from "effect"; import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + import { GitCommandError } from "../Errors.ts"; import { - ExecuteGitInput, - ExecuteGitResult, + type ExecuteGitInput, + type ExecuteGitProgress, + type ExecuteGitResult, GitService, - GitServiceShape, + type GitServiceShape, } from "../Services/GitService.ts"; const DEFAULT_TIMEOUT_MS = 30_000; const DEFAULT_MAX_OUTPUT_BYTES = 1_000_000; +const Trace2ChunkSchema = Schema.Record(Schema.String, Schema.Unknown); +const decodeTrace2Chunk = Schema.decodeEffect(Schema.fromJsonString(Trace2ChunkSchema)); function quoteGitCommand(args: ReadonlyArray): string { return `git ${args.join(" ")}`; @@ -39,14 +46,178 @@ function toGitCommandError( }); } +function trace2ChildKey(record: Record): string | null { + const childId = record.child_id; + if (typeof childId === "number" || typeof childId === "string") { + return String(childId); + } + const hookName = record.hook_name; + return typeof hookName === "string" && hookName.trim().length > 0 ? hookName.trim() : null; +} + +const createTrace2Monitor = Effect.fn(function* ( + input: Pick, + progress: ExecuteGitProgress | undefined, +) { + if (!progress?.onHookStarted && !progress?.onHookFinished) { + return { + env: {}, + flush: Effect.void, + }; + } + + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; + const traceDir = tmpdir(); + const traceFileName = `t3code-git-trace2-${process.pid}-${randomUUID()}.json`; + const traceFilePath = path.join(traceDir, traceFileName); + const hookStartByChildKey = new Map(); + let processedChars = 0; + let lineBuffer = ""; + + const handleTraceLine = (line: string) => + Effect.gen(function* () { + const trimmedLine = line.trim(); + if (trimmedLine.length === 0) { + return; + } + + const decodedRecord = yield* Effect.exit(decodeTrace2Chunk(trimmedLine)); + if (decodedRecord._tag === "Failure") { + yield* Effect.logDebug( + `GitService.trace2: failed to parse trace line for ${quoteGitCommand(input.args)} in ${input.cwd}.`, + ); + return; + } + const record = decodedRecord.value; + if (Object.keys(record).length === 0) { + yield* Effect.logDebug( + `GitService.trace2: failed to parse trace line for ${quoteGitCommand(input.args)} in ${input.cwd}: Trace line was not an object.`, + ); + return; + } + + if (record.child_class !== "hook") { + return; + } + + const event = record.event; + const childKey = trace2ChildKey(record); + if (childKey === null) { + return; + } + const started = hookStartByChildKey.get(childKey); + const hookNameFromEvent = typeof record.hook_name === "string" ? record.hook_name.trim() : ""; + const hookName = hookNameFromEvent.length > 0 ? hookNameFromEvent : (started?.hookName ?? ""); + if (hookName.length === 0) { + return; + } + + if (event === "child_start") { + hookStartByChildKey.set(childKey, { hookName, startedAtMs: Date.now() }); + if (progress.onHookStarted) { + yield* progress.onHookStarted(hookName); + } + return; + } + + if (event === "child_exit") { + hookStartByChildKey.delete(childKey); + if (progress.onHookFinished) { + const code = record.code; + yield* progress.onHookFinished({ + hookName: started?.hookName ?? hookName, + exitCode: typeof code === "number" && Number.isInteger(code) ? code : null, + durationMs: started ? Math.max(0, Date.now() - started.startedAtMs) : null, + }); + } + } + }); + + const readTraceDelta = fileSystem.readFileString(traceFilePath).pipe( + Effect.catch(() => Effect.succeed("")), + Effect.flatMap((delta) => + Effect.gen(function* () { + if (delta.length <= processedChars) { + return; + } + const appended = delta.slice(processedChars); + processedChars = delta.length; + lineBuffer += appended; + let newlineIndex = lineBuffer.indexOf("\n"); + while (newlineIndex >= 0) { + const line = lineBuffer.slice(0, newlineIndex); + lineBuffer = lineBuffer.slice(newlineIndex + 1); + yield* handleTraceLine(line); + newlineIndex = lineBuffer.indexOf("\n"); + } + }), + ), + ); + const watchTraceFile = Stream.runForEach(fileSystem.watch(traceDir), (event) => { + const eventPath = event.path; + const isTargetTraceEvent = + eventPath === traceFileName || + eventPath === traceFilePath || + path.resolve(traceDir, eventPath) === traceFilePath; + if (!isTargetTraceEvent) { + return Effect.void; + } + return readTraceDelta; + }).pipe(Effect.ignoreCause({ log: true })); + + yield* Effect.forkScoped(watchTraceFile); + + yield* Effect.addFinalizer(() => + Effect.gen(function* () { + yield* readTraceDelta; + const finalLine = lineBuffer.trim(); + if (finalLine.length > 0) { + yield* handleTraceLine(finalLine); + } + yield* fileSystem.remove(traceFilePath).pipe(Effect.catch(() => Effect.void)); + }), + ); + + return { + env: { + GIT_TRACE2_EVENT: traceFilePath, + }, + flush: readTraceDelta, + }; +}); + const collectOutput = Effect.fn(function* ( input: Pick, stream: Stream.Stream, maxOutputBytes: number, + onLine: ((line: string) => Effect.Effect) | undefined, ): Effect.fn.Return { const decoder = new TextDecoder(); let bytes = 0; let text = ""; + let lineBuffer = ""; + + const emitCompleteLines = (flush: boolean) => + Effect.gen(function* () { + let newlineIndex = lineBuffer.indexOf("\n"); + while (newlineIndex >= 0) { + const line = lineBuffer.slice(0, newlineIndex).replace(/\r$/, ""); + lineBuffer = lineBuffer.slice(newlineIndex + 1); + if (line.length > 0 && onLine) { + yield* onLine(line); + } + newlineIndex = lineBuffer.indexOf("\n"); + } + + if (flush) { + const trailing = lineBuffer.replace(/\r$/, ""); + lineBuffer = ""; + if (trailing.length > 0 && onLine) { + yield* onLine(trailing); + } + } + }); yield* Stream.runForEach(stream, (chunk) => Effect.gen(function* () { @@ -59,16 +230,24 @@ const collectOutput = Effect.fn(function* ( detail: `${quoteGitCommand(input.args)} output exceeded ${maxOutputBytes} bytes and was truncated.`, }); } - text += decoder.decode(chunk, { stream: true }); + const decoded = decoder.decode(chunk, { stream: true }); + text += decoded; + lineBuffer += decoded; + yield* emitCompleteLines(false); }), ).pipe(Effect.mapError(toGitCommandError(input, "output stream failed."))); - text += decoder.decode(); + const remainder = decoder.decode(); + text += remainder; + lineBuffer += remainder; + yield* emitCompleteLines(true); return text; }); const makeGitService = Effect.gen(function* () { const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const fileSystem = yield* FileSystem.FileSystem; + const path = yield* Path.Path; const execute: GitServiceShape["execute"] = Effect.fnUntraced(function* (input) { const commandInput = { @@ -79,19 +258,27 @@ const makeGitService = Effect.gen(function* () { const maxOutputBytes = input.maxOutputBytes ?? DEFAULT_MAX_OUTPUT_BYTES; const commandEffect = Effect.gen(function* () { + const trace2Monitor = yield* createTrace2Monitor(commandInput, input.progress).pipe( + Effect.provideService(FileSystem.FileSystem, fileSystem), + Effect.provideService(Path.Path, path), + ); const child = yield* commandSpawner .spawn( ChildProcess.make("git", commandInput.args, { cwd: commandInput.cwd, - ...(input.env ? { env: input.env } : {}), + env: { + ...process.env, + ...input.env, + ...trace2Monitor.env, + }, }), ) .pipe(Effect.mapError(toGitCommandError(commandInput, "failed to spawn."))); const [stdout, stderr, exitCode] = yield* Effect.all( [ - collectOutput(commandInput, child.stdout, maxOutputBytes), - collectOutput(commandInput, child.stderr, maxOutputBytes), + collectOutput(commandInput, child.stdout, maxOutputBytes, input.progress?.onStdoutLine), + collectOutput(commandInput, child.stderr, maxOutputBytes, input.progress?.onStderrLine), child.exitCode.pipe( Effect.map((value) => Number(value)), Effect.mapError(toGitCommandError(commandInput, "failed to report exit code.")), @@ -99,6 +286,7 @@ const makeGitService = Effect.gen(function* () { ], { concurrency: "unbounded" }, ); + yield* trace2Monitor.flush; if (!input.allowNonZeroExit && exitCode !== 0) { const trimmedStderr = stderr.trim(); diff --git a/apps/server/src/git/Services/GitCore.ts b/apps/server/src/git/Services/GitCore.ts index 879927934e..ff91b83aca 100644 --- a/apps/server/src/git/Services/GitCore.ts +++ b/apps/server/src/git/Services/GitCore.ts @@ -33,6 +33,24 @@ export interface GitPreparedCommitContext { stagedPatch: string; } +export interface GitCommitProgress { + readonly onOutputLine?: (input: { + stream: "stdout" | "stderr"; + text: string; + }) => Effect.Effect; + readonly onHookStarted?: (hookName: string) => Effect.Effect; + readonly onHookFinished?: (input: { + hookName: string; + exitCode: number | null; + durationMs: number | null; + }) => Effect.Effect; +} + +export interface GitCommitOptions { + readonly timeoutMs?: number; + readonly progress?: GitCommitProgress; +} + export interface GitPushResult { status: "pushed" | "skipped_up_to_date"; branch: string; @@ -111,6 +129,7 @@ export interface GitCoreShape { cwd: string, subject: string, body: string, + options?: GitCommitOptions, ) => Effect.Effect<{ commitSha: string }, GitCommandError>; /** diff --git a/apps/server/src/git/Services/GitManager.ts b/apps/server/src/git/Services/GitManager.ts index 650581c716..2e83b78c3b 100644 --- a/apps/server/src/git/Services/GitManager.ts +++ b/apps/server/src/git/Services/GitManager.ts @@ -7,6 +7,7 @@ * @module GitManager */ import { + GitActionProgressEvent, GitPreparePullRequestThreadInput, GitPreparePullRequestThreadResult, GitPullRequestRefInput, @@ -20,6 +21,15 @@ import { ServiceMap } from "effect"; import type { Effect } from "effect"; import type { GitManagerServiceError } from "../Errors.ts"; +export interface GitActionProgressReporter { + readonly publish: (event: GitActionProgressEvent) => Effect.Effect; +} + +export interface GitRunStackedActionOptions { + readonly actionId?: string; + readonly progressReporter?: GitActionProgressReporter; +} + /** * GitManagerShape - Service API for high-level Git workflow actions. */ @@ -51,6 +61,7 @@ export interface GitManagerShape { */ readonly runStackedAction: ( input: GitRunStackedActionInput, + options?: GitRunStackedActionOptions, ) => Effect.Effect; } diff --git a/apps/server/src/git/Services/GitService.ts b/apps/server/src/git/Services/GitService.ts index f43a4e6dcc..8414256d64 100644 --- a/apps/server/src/git/Services/GitService.ts +++ b/apps/server/src/git/Services/GitService.ts @@ -11,6 +11,17 @@ import type { Effect } from "effect"; import type { GitCommandError } from "../Errors.ts"; +export interface ExecuteGitProgress { + readonly onStdoutLine?: (line: string) => Effect.Effect; + readonly onStderrLine?: (line: string) => Effect.Effect; + readonly onHookStarted?: (hookName: string) => Effect.Effect; + readonly onHookFinished?: (input: { + hookName: string; + exitCode: number | null; + durationMs: number | null; + }) => Effect.Effect; +} + export interface ExecuteGitInput { readonly operation: string; readonly cwd: string; @@ -19,6 +30,7 @@ export interface ExecuteGitInput { readonly allowNonZeroExit?: boolean; readonly timeoutMs?: number; readonly maxOutputBytes?: number; + readonly progress?: ExecuteGitProgress; } export interface ExecuteGitResult { diff --git a/apps/server/src/wsServer.test.ts b/apps/server/src/wsServer.test.ts index f12792a318..87e515d808 100644 --- a/apps/server/src/wsServer.test.ts +++ b/apps/server/src/wsServer.test.ts @@ -1791,15 +1791,94 @@ describe("WebSocket Server", () => { connections.push(ws); const response = await sendRequest(ws, WS_METHODS.gitRunStackedAction, { + actionId: "client-action-1", cwd: "/test", action: "commit_push", }); expect(response.result).toBeUndefined(); expect(response.error?.message).toContain("detached HEAD"); - expect(runStackedAction).toHaveBeenCalledWith({ + expect(runStackedAction).toHaveBeenCalledWith( + { + actionId: "client-action-1", + cwd: "/test", + action: "commit_push", + }, + expect.objectContaining({ + actionId: "client-action-1", + progressReporter: expect.any(Object), + }), + ); + }); + + it("publishes git action progress only to the initiating websocket", async () => { + const runStackedAction = vi.fn( + (_input, options) => + options?.progressReporter + ?.publish({ + actionId: options.actionId ?? "action-1", + cwd: "/test", + action: "commit", + kind: "phase_started", + phase: "commit", + label: "Committing...", + }) + .pipe( + Effect.flatMap(() => + Effect.succeed({ + action: "commit" as const, + branch: { status: "skipped_not_requested" as const }, + commit: { + status: "created" as const, + commitSha: "abc1234", + subject: "Test commit", + }, + push: { status: "skipped_not_requested" as const }, + pr: { status: "skipped_not_requested" as const }, + }), + ), + ) ?? Effect.void, + ); + const gitManager: GitManagerShape = { + status: vi.fn(() => Effect.void as any), + resolvePullRequest: vi.fn(() => Effect.void as any), + preparePullRequestThread: vi.fn(() => Effect.void as any), + runStackedAction, + }; + + server = await createTestServer({ cwd: "/test", gitManager }); + const addr = server.address(); + const port = typeof addr === "object" && addr !== null ? addr.port : 0; + + const [initiatingWs] = await connectAndAwaitWelcome(port); + const [otherWs] = await connectAndAwaitWelcome(port); + connections.push(initiatingWs, otherWs); + + const responsePromise = sendRequest(initiatingWs, WS_METHODS.gitRunStackedAction, { + actionId: "client-action-2", cwd: "/test", - action: "commit_push", + action: "commit", + }); + const progressPush = await waitForPush(initiatingWs, WS_CHANNELS.gitActionProgress); + + expect(progressPush.data).toEqual({ + actionId: "client-action-2", + cwd: "/test", + action: "commit", + kind: "phase_started", + phase: "commit", + label: "Committing...", }); + + await expect( + waitForPush(otherWs, WS_CHANNELS.gitActionProgress, undefined, 10, 100), + ).rejects.toThrow("Timed out waiting for WebSocket message after 100ms"); + await expect(responsePromise).resolves.toEqual( + expect.objectContaining({ + result: expect.objectContaining({ + action: "commit", + }), + }), + ); }); it("rejects websocket connections without a valid auth token", async () => { diff --git a/apps/server/src/wsServer.ts b/apps/server/src/wsServer.ts index 2e6ac51b7f..900353eb37 100644 --- a/apps/server/src/wsServer.ts +++ b/apps/server/src/wsServer.ts @@ -704,7 +704,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< Effect.all([closeAllClients, closeWebSocketServer.pipe(Effect.ignoreCause({ log: true }))]), ); - const routeRequest = Effect.fnUntraced(function* (request: WebSocketRequest) { + const routeRequest = Effect.fnUntraced(function* (ws: WebSocket, request: WebSocketRequest) { switch (request.body._tag) { case ORCHESTRATION_WS_METHODS.getSnapshot: return yield* projectionReadModelQuery.getSnapshot(); @@ -793,7 +793,13 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< case WS_METHODS.gitRunStackedAction: { const body = stripRequestTag(request.body); - return yield* gitManager.runStackedAction(body); + return yield* gitManager.runStackedAction(body, { + actionId: body.actionId, + progressReporter: { + publish: (event) => + pushBus.publishClient(ws, WS_CHANNELS.gitActionProgress, event).pipe(Effect.asVoid), + }, + }); } case WS_METHODS.gitResolvePullRequest: { @@ -915,7 +921,7 @@ export const createServer = Effect.fn(function* (): Effect.fn.Return< }); } - const result = yield* Effect.exit(routeRequest(request.success)); + const result = yield* Effect.exit(routeRequest(ws, request.success)); if (Exit.isFailure(result)) { return yield* sendWsResponse({ id: request.success.id, diff --git a/apps/web/src/components/GitActionsControl.tsx b/apps/web/src/components/GitActionsControl.tsx index 0771875135..afbc718147 100644 --- a/apps/web/src/components/GitActionsControl.tsx +++ b/apps/web/src/components/GitActionsControl.tsx @@ -1,6 +1,11 @@ -import type { GitStackedAction, GitStatusResult, ThreadId } from "@t3tools/contracts"; +import type { + GitActionProgressEvent, + GitStackedAction, + GitStatusResult, + ThreadId, +} from "@t3tools/contracts"; import { useIsMutating, useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useEffectEvent, useMemo, useRef, useState } from "react"; import { ChevronDownIcon, CloudUploadIcon, GitCommitIcon, InfoIcon } from "lucide-react"; import { GitHubIcon } from "./Icons"; import { @@ -43,6 +48,7 @@ import { gitStatusQueryOptions, invalidateGitQueries, } from "~/lib/gitReactQuery"; +import { randomUUID } from "~/lib/utils"; import { resolvePathLinkTarget } from "~/terminal-links"; import { readNativeApi } from "~/nativeApi"; @@ -63,6 +69,50 @@ interface PendingDefaultBranchAction { type GitActionToastId = ReturnType; +interface ActiveGitActionProgress { + toastId: GitActionToastId; + actionId: string; + title: string; + phaseStartedAtMs: number | null; + hookStartedAtMs: number | null; + hookName: string | null; + lastOutputLine: string | null; + currentPhaseLabel: string | null; +} + +interface RunGitActionWithToastInput { + action: GitStackedAction; + commitMessage?: string; + forcePushOnlyProgress?: boolean; + onConfirmed?: () => void; + skipDefaultBranchPrompt?: boolean; + statusOverride?: GitStatusResult | null; + featureBranch?: boolean; + isDefaultBranchOverride?: boolean; + progressToastId?: GitActionToastId; + filePaths?: string[]; +} + +function formatElapsedDescription(startedAtMs: number | null): string | undefined { + if (startedAtMs === null) { + return undefined; + } + const elapsedSeconds = Math.max(0, Math.floor((Date.now() - startedAtMs) / 1000)); + if (elapsedSeconds < 60) { + return `Running for ${elapsedSeconds}s`; + } + const minutes = Math.floor(elapsedSeconds / 60); + const seconds = elapsedSeconds % 60; + return `Running for ${minutes}m ${seconds}s`; +} + +function resolveProgressDescription(progress: ActiveGitActionProgress): string | undefined { + if (progress.lastOutputLine) { + return progress.lastOutputLine; + } + return formatElapsedDescription(progress.hookStartedAtMs ?? progress.phaseStartedAtMs); +} + function getMenuActionDisabledReason({ item, gitStatus, @@ -167,6 +217,21 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions const [isEditingFiles, setIsEditingFiles] = useState(false); const [pendingDefaultBranchAction, setPendingDefaultBranchAction] = useState(null); + const activeGitActionProgressRef = useRef(null); + + const updateActiveProgressToast = useCallback(() => { + const progress = activeGitActionProgressRef.current; + if (!progress) { + return; + } + toastManager.update(progress.toastId, { + type: "loading", + title: progress.title, + description: resolveProgressDescription(progress), + timeout: 0, + data: threadToastData, + }); + }, [threadToastData]); const { data: gitStatus = null, error: gitStatusError } = useQuery(gitStatusQueryOptions(gitCwd)); @@ -232,6 +297,84 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions }) : null; + useEffect(() => { + const api = readNativeApi(); + if (!api) { + return; + } + + const applyProgressEvent = (event: GitActionProgressEvent) => { + const progress = activeGitActionProgressRef.current; + if (!progress) { + return; + } + if (gitCwd && event.cwd !== gitCwd) { + return; + } + if (progress.actionId !== event.actionId) { + return; + } + + const now = Date.now(); + switch (event.kind) { + case "action_started": + progress.phaseStartedAtMs = now; + progress.hookStartedAtMs = null; + progress.hookName = null; + progress.lastOutputLine = null; + break; + case "phase_started": + progress.title = event.label; + progress.currentPhaseLabel = event.label; + progress.phaseStartedAtMs = now; + progress.hookStartedAtMs = null; + progress.hookName = null; + progress.lastOutputLine = null; + break; + case "hook_started": + progress.title = `Running ${event.hookName}...`; + progress.hookName = event.hookName; + progress.hookStartedAtMs = now; + progress.lastOutputLine = null; + break; + case "hook_output": + progress.lastOutputLine = event.text; + break; + case "hook_finished": + progress.title = progress.currentPhaseLabel ?? "Committing..."; + progress.hookName = null; + progress.hookStartedAtMs = null; + progress.lastOutputLine = null; + break; + case "action_finished": + progress.phaseStartedAtMs = null; + progress.hookStartedAtMs = null; + break; + case "action_failed": + progress.phaseStartedAtMs = null; + progress.hookStartedAtMs = null; + break; + } + + updateActiveProgressToast(); + }; + + return api.git.onActionProgress(applyProgressEvent); + }, [gitCwd, updateActiveProgressToast]); + + useEffect(() => { + const interval = window.setInterval(() => { + if (!activeGitActionProgressRef.current) { + return; + } + updateActiveProgressToast(); + }, 1000); + + return () => { + window.clearInterval(interval); + }; + }, [updateActiveProgressToast]); + const openExistingPr = useCallback(async () => { const api = readNativeApi(); if (!api) { @@ -259,9 +402,9 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions data: threadToastData, }); }); - }, [gitStatusForActions?.pr?.state, gitStatusForActions?.pr?.url, threadToastData]); + }, [gitStatusForActions, threadToastData]); - const runGitActionWithToast = useCallback( + const runGitActionWithToast = useEffectEvent( async ({ action, commitMessage, @@ -273,18 +416,7 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions isDefaultBranchOverride, progressToastId, filePaths, - }: { - action: GitStackedAction; - commitMessage?: string; - forcePushOnlyProgress?: boolean; - onConfirmed?: () => void; - skipDefaultBranchPrompt?: boolean; - statusOverride?: GitStatusResult | null; - featureBranch?: boolean; - isDefaultBranchOverride?: boolean; - progressToastId?: GitActionToastId; - filePaths?: string[]; - }) => { + }: RunGitActionWithToastInput) => { const actionStatus = statusOverride ?? gitStatusForActions; const actionBranch = actionStatus?.branch ?? null; const actionIsDefaultBranch = @@ -319,40 +451,40 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions forcePushOnly: forcePushOnlyProgress, featureBranch, }); + const actionId = randomUUID(); const resolvedProgressToastId = progressToastId ?? toastManager.add({ type: "loading", title: progressStages[0] ?? "Running git action...", + description: "Waiting for Git...", timeout: 0, data: threadToastData, }); + activeGitActionProgressRef.current = { + toastId: resolvedProgressToastId, + actionId, + title: progressStages[0] ?? "Running git action...", + phaseStartedAtMs: null, + hookStartedAtMs: null, + hookName: null, + lastOutputLine: null, + currentPhaseLabel: progressStages[0] ?? "Running git action...", + }; + if (progressToastId) { toastManager.update(progressToastId, { type: "loading", title: progressStages[0] ?? "Running git action...", + description: "Waiting for Git...", timeout: 0, data: threadToastData, }); } - let stageIndex = 0; - const stageInterval = setInterval(() => { - stageIndex = Math.min(stageIndex + 1, progressStages.length - 1); - toastManager.update(resolvedProgressToastId, { - title: progressStages[stageIndex] ?? "Running git action...", - type: "loading", - timeout: 0, - data: threadToastData, - }); - }, 1100); - - const stopProgressUpdates = () => { - clearInterval(stageInterval); - }; - const promise = runImmediateGitActionMutation.mutateAsync({ + actionId, action, ...(commitMessage ? { commitMessage } : {}), ...(featureBranch ? { featureBranch } : {}), @@ -361,7 +493,7 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions try { const result = await promise; - stopProgressUpdates(); + activeGitActionProgressRef.current = null; const resultToast = summarizeGitResult(result); const existingOpenPrUrl = @@ -437,7 +569,7 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions : {}), }); } catch (err) { - stopProgressUpdates(); + activeGitActionProgressRef.current = null; toastManager.update(resolvedProgressToastId, { type: "error", title: "Action failed", @@ -446,14 +578,6 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions }); } }, - - [ - isDefaultBranch, - runImmediateGitActionMutation, - setPendingDefaultBranchAction, - threadToastData, - gitStatusForActions, - ], ); const continuePendingDefaultBranchAction = useCallback(() => { @@ -469,38 +593,23 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions ...(filePaths ? { filePaths } : {}), skipDefaultBranchPrompt: true, }); - }, [pendingDefaultBranchAction, runGitActionWithToast]); - - const checkoutNewBranchAndRunAction = useCallback( - (actionParams: { - action: GitStackedAction; - commitMessage?: string; - forcePushOnlyProgress?: boolean; - onConfirmed?: () => void; - filePaths?: string[]; - }) => { - void runGitActionWithToast({ - ...actionParams, - featureBranch: true, - skipDefaultBranchPrompt: true, - }); - }, - [runGitActionWithToast], - ); + }, [pendingDefaultBranchAction]); const checkoutFeatureBranchAndContinuePendingAction = useCallback(() => { if (!pendingDefaultBranchAction) return; const { action, commitMessage, forcePushOnlyProgress, onConfirmed, filePaths } = pendingDefaultBranchAction; setPendingDefaultBranchAction(null); - checkoutNewBranchAndRunAction({ + void runGitActionWithToast({ action, ...(commitMessage ? { commitMessage } : {}), forcePushOnlyProgress, ...(onConfirmed ? { onConfirmed } : {}), ...(filePaths ? { filePaths } : {}), + featureBranch: true, + skipDefaultBranchPrompt: true, }); - }, [pendingDefaultBranchAction, checkoutNewBranchAndRunAction]); + }, [pendingDefaultBranchAction]); const runDialogActionOnNewBranch = useCallback(() => { if (!isCommitDialogOpen) return; @@ -511,18 +620,14 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions setExcludedFiles(new Set()); setIsEditingFiles(false); - checkoutNewBranchAndRunAction({ + void runGitActionWithToast({ action: "commit", ...(commitMessage ? { commitMessage } : {}), ...(!allSelected ? { filePaths: selectedFiles.map((f) => f.path) } : {}), + featureBranch: true, + skipDefaultBranchPrompt: true, }); - }, [ - allSelected, - isCommitDialogOpen, - dialogCommitMessage, - checkoutNewBranchAndRunAction, - selectedFiles, - ]); + }, [allSelected, isCommitDialogOpen, dialogCommitMessage, selectedFiles]); const runQuickAction = useCallback(() => { if (quickAction.kind === "open_pr") { @@ -562,7 +667,7 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions if (quickAction.action) { void runGitActionWithToast({ action: quickAction.action }); } - }, [openExistingPr, pullMutation, quickAction, runGitActionWithToast, threadToastData]); + }, [openExistingPr, pullMutation, quickAction, threadToastData]); const openDialogForMenuItem = useCallback( (item: GitActionMenuItem) => { @@ -583,7 +688,7 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions setIsEditingFiles(false); setIsCommitDialogOpen(true); }, - [openExistingPr, runGitActionWithToast, setIsCommitDialogOpen], + [openExistingPr, setIsCommitDialogOpen], ); const runDialogAction = useCallback(() => { @@ -602,7 +707,6 @@ export default function GitActionsControl({ gitCwd, activeThreadId }: GitActions allSelected, dialogCommitMessage, isCommitDialogOpen, - runGitActionWithToast, selectedFiles, setDialogCommitMessage, setIsCommitDialogOpen, diff --git a/apps/web/src/lib/gitReactQuery.ts b/apps/web/src/lib/gitReactQuery.ts index d520f73c1c..d6a72859f3 100644 --- a/apps/web/src/lib/gitReactQuery.ts +++ b/apps/web/src/lib/gitReactQuery.ts @@ -117,11 +117,13 @@ export function gitRunStackedActionMutationOptions(input: { return mutationOptions({ mutationKey: gitMutationKeys.runStackedAction(input.cwd), mutationFn: async ({ + actionId, action, commitMessage, featureBranch, filePaths, }: { + actionId: string; action: GitStackedAction; commitMessage?: string; featureBranch?: boolean; @@ -130,6 +132,7 @@ export function gitRunStackedActionMutationOptions(input: { const api = ensureNativeApi(); if (!input.cwd) throw new Error("Git action is unavailable."); return api.git.runStackedAction({ + actionId, cwd: input.cwd, action, ...(commitMessage ? { commitMessage } : {}), diff --git a/apps/web/src/wsNativeApi.test.ts b/apps/web/src/wsNativeApi.test.ts index 2323380da0..62ea0d101b 100644 --- a/apps/web/src/wsNativeApi.test.ts +++ b/apps/web/src/wsNativeApi.test.ts @@ -239,9 +239,11 @@ describe("wsNativeApi", () => { const api = createWsNativeApi(); const onTerminalEvent = vi.fn(); const onDomainEvent = vi.fn(); + const onActionProgress = vi.fn(); api.terminal.onEvent(onTerminalEvent); api.orchestration.onDomainEvent(onDomainEvent); + api.git.onActionProgress(onActionProgress); const terminalEvent = { threadId: "thread-1", @@ -274,11 +276,28 @@ describe("wsNativeApi", () => { }, } satisfies Extract; emitPush(ORCHESTRATION_WS_CHANNELS.domainEvent, orchestrationEvent); + emitPush(WS_CHANNELS.gitActionProgress, { + actionId: "action-1", + cwd: "/repo", + action: "commit", + kind: "phase_started", + phase: "commit", + label: "Committing...", + }); expect(onTerminalEvent).toHaveBeenCalledTimes(1); expect(onTerminalEvent).toHaveBeenCalledWith(terminalEvent); expect(onDomainEvent).toHaveBeenCalledTimes(1); expect(onDomainEvent).toHaveBeenCalledWith(orchestrationEvent); + expect(onActionProgress).toHaveBeenCalledTimes(1); + expect(onActionProgress).toHaveBeenCalledWith({ + actionId: "action-1", + cwd: "/repo", + action: "commit", + kind: "phase_started", + phase: "commit", + label: "Committing...", + }); }); it("wraps orchestration dispatch commands in the command envelope", async () => { @@ -320,6 +339,26 @@ describe("wsNativeApi", () => { }); }); + it("uses no client timeout for git.runStackedAction", async () => { + requestMock.mockResolvedValue({ + action: "commit", + branch: { status: "skipped_not_requested" }, + commit: { status: "created", commitSha: "abc1234", subject: "Test" }, + push: { status: "skipped_not_requested" }, + pr: { status: "skipped_not_requested" }, + }); + const { createWsNativeApi } = await import("./wsNativeApi"); + + const api = createWsNativeApi(); + await api.git.runStackedAction({ actionId: "action-1", cwd: "/repo", action: "commit" }); + + expect(requestMock).toHaveBeenCalledWith( + WS_METHODS.gitRunStackedAction, + { actionId: "action-1", cwd: "/repo", action: "commit" }, + { timeoutMs: null }, + ); + }); + it("forwards full-thread diff requests to the orchestration websocket method", async () => { requestMock.mockResolvedValue({ diff: "patch" }); const { createWsNativeApi } = await import("./wsNativeApi"); diff --git a/apps/web/src/wsNativeApi.ts b/apps/web/src/wsNativeApi.ts index ddfffbde69..042875f6f7 100644 --- a/apps/web/src/wsNativeApi.ts +++ b/apps/web/src/wsNativeApi.ts @@ -1,4 +1,5 @@ import { + type GitActionProgressEvent, ORCHESTRATION_WS_CHANNELS, ORCHESTRATION_WS_METHODS, type ContextMenuItem, @@ -15,6 +16,7 @@ import { WsTransport } from "./wsTransport"; let instance: { api: NativeApi; transport: WsTransport } | null = null; const welcomeListeners = new Set<(payload: WsWelcomePayload) => void>(); const serverConfigUpdatedListeners = new Set<(payload: ServerConfigUpdatedPayload) => void>(); +const gitActionProgressListeners = new Set<(payload: GitActionProgressEvent) => void>(); /** * Subscribe to the server welcome message. If a welcome was already received @@ -87,6 +89,16 @@ export function createWsNativeApi(): NativeApi { } } }); + transport.subscribe(WS_CHANNELS.gitActionProgress, (message) => { + const payload = message.data; + for (const listener of gitActionProgressListeners) { + try { + listener(payload); + } catch { + // Swallow listener errors + } + } + }); const api: NativeApi = { dialogs: { @@ -135,7 +147,8 @@ export function createWsNativeApi(): NativeApi { git: { pull: (input) => transport.request(WS_METHODS.gitPull, input), status: (input) => transport.request(WS_METHODS.gitStatus, input), - runStackedAction: (input) => transport.request(WS_METHODS.gitRunStackedAction, input), + runStackedAction: (input) => + transport.request(WS_METHODS.gitRunStackedAction, input, { timeoutMs: null }), listBranches: (input) => transport.request(WS_METHODS.gitListBranches, input), createWorktree: (input) => transport.request(WS_METHODS.gitCreateWorktree, input), removeWorktree: (input) => transport.request(WS_METHODS.gitRemoveWorktree, input), @@ -145,6 +158,12 @@ export function createWsNativeApi(): NativeApi { resolvePullRequest: (input) => transport.request(WS_METHODS.gitResolvePullRequest, input), preparePullRequestThread: (input) => transport.request(WS_METHODS.gitPreparePullRequestThread, input), + onActionProgress: (callback) => { + gitActionProgressListeners.add(callback); + return () => { + gitActionProgressListeners.delete(callback); + }; + }, }, contextMenu: { show: async ( diff --git a/apps/web/src/wsTransport.test.ts b/apps/web/src/wsTransport.test.ts index d905bbcf9a..55ff2556e4 100644 --- a/apps/web/src/wsTransport.test.ts +++ b/apps/web/src/wsTransport.test.ts @@ -206,4 +206,51 @@ describe("WsTransport", () => { await expect(requestPromise).resolves.toEqual({ projects: [] }); transport.dispose(); }); + + it("does not create a timeout for requests with timeoutMs null", async () => { + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const transport = new WsTransport("ws://localhost:3020"); + const socket = getSocket(); + socket.open(); + + const requestPromise = transport.request( + "git.runStackedAction", + { cwd: "/repo" }, + { timeoutMs: null }, + ); + const sent = socket.sent.at(-1); + if (!sent) { + throw new Error("Expected request envelope to be sent"); + } + const requestEnvelope = JSON.parse(sent) as { id: string }; + + socket.serverMessage( + JSON.stringify({ + id: requestEnvelope.id, + result: { ok: true }, + }), + ); + + await expect(requestPromise).resolves.toEqual({ ok: true }); + expect(timeoutSpy.mock.calls.some(([callback]) => typeof callback === "function")).toBe(false); + + transport.dispose(); + }); + + it("rejects pending requests when the websocket closes", async () => { + const transport = new WsTransport("ws://localhost:3020"); + const socket = getSocket(); + socket.open(); + + const requestPromise = transport.request( + "git.runStackedAction", + { cwd: "/repo" }, + { timeoutMs: null }, + ); + + socket.close(); + + await expect(requestPromise).rejects.toThrow("WebSocket connection closed."); + transport.dispose(); + }); }); diff --git a/apps/web/src/wsTransport.ts b/apps/web/src/wsTransport.ts index 46c74d9090..69a5ab8dd0 100644 --- a/apps/web/src/wsTransport.ts +++ b/apps/web/src/wsTransport.ts @@ -14,13 +14,17 @@ type PushListener = (message: WsPushMessage) => void interface PendingRequest { resolve: (result: unknown) => void; reject: (error: Error) => void; - timeout: ReturnType; + timeout: ReturnType | null; } interface SubscribeOptions { readonly replayLatest?: boolean; } +interface RequestOptions { + readonly timeoutMs?: number | null; +} + type TransportState = "connecting" | "open" | "reconnecting" | "closed" | "disposed"; const REQUEST_TIMEOUT_MS = 60_000; @@ -72,7 +76,11 @@ export class WsTransport { this.connect(); } - async request(method: string, params?: unknown): Promise { + async request( + method: string, + params?: unknown, + options?: RequestOptions, + ): Promise { if (typeof method !== "string" || method.length === 0) { throw new Error("Request method is required"); } @@ -83,10 +91,14 @@ export class WsTransport { const encoded = JSON.stringify(message); return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - this.pending.delete(id); - reject(new Error(`Request timed out: ${method}`)); - }, REQUEST_TIMEOUT_MS); + const timeoutMs = options?.timeoutMs === undefined ? REQUEST_TIMEOUT_MS : options.timeoutMs; + const timeout = + timeoutMs === null + ? null + : setTimeout(() => { + this.pending.delete(id); + reject(new Error(`Request timed out: ${method}`)); + }, timeoutMs); this.pending.set(id, { resolve: resolve as (result: unknown) => void, @@ -146,7 +158,9 @@ export class WsTransport { this.reconnectTimer = null; } for (const pending of this.pending.values()) { - clearTimeout(pending.timeout); + if (pending.timeout !== null) { + clearTimeout(pending.timeout); + } pending.reject(new Error("Transport disposed")); } this.pending.clear(); @@ -178,6 +192,13 @@ export class WsTransport { if (this.ws === ws) { this.ws = null; } + for (const [id, pending] of this.pending.entries()) { + if (pending.timeout !== null) { + clearTimeout(pending.timeout); + } + this.pending.delete(id); + pending.reject(new Error("WebSocket connection closed.")); + } if (this.disposed) { this.state = "disposed"; return; @@ -224,7 +245,9 @@ export class WsTransport { return; } - clearTimeout(pending.timeout); + if (pending.timeout !== null) { + clearTimeout(pending.timeout); + } this.pending.delete(message.id); if (message.error) { diff --git a/packages/contracts/src/git.test.ts b/packages/contracts/src/git.test.ts index b3775504fb..afb79d3a7c 100644 --- a/packages/contracts/src/git.test.ts +++ b/packages/contracts/src/git.test.ts @@ -4,6 +4,7 @@ import { Schema } from "effect"; import { GitCreateWorktreeInput, GitPreparePullRequestThreadInput, + GitRunStackedActionInput, GitResolvePullRequestResult, } from "./git"; @@ -11,6 +12,7 @@ const decodeCreateWorktreeInput = Schema.decodeUnknownSync(GitCreateWorktreeInpu const decodePreparePullRequestThreadInput = Schema.decodeUnknownSync( GitPreparePullRequestThreadInput, ); +const decodeRunStackedActionInput = Schema.decodeUnknownSync(GitRunStackedActionInput); const decodeResolvePullRequestResult = Schema.decodeUnknownSync(GitResolvePullRequestResult); describe("GitCreateWorktreeInput", () => { @@ -56,3 +58,16 @@ describe("GitResolvePullRequestResult", () => { expect(parsed.pullRequest.headBranch).toBe("feature/pr-threads"); }); }); + +describe("GitRunStackedActionInput", () => { + it("requires a client-provided actionId for progress correlation", () => { + const parsed = decodeRunStackedActionInput({ + actionId: "action-1", + cwd: "/repo", + action: "commit", + }); + + expect(parsed.actionId).toBe("action-1"); + expect(parsed.action).toBe("commit"); + }); +}); diff --git a/packages/contracts/src/git.ts b/packages/contracts/src/git.ts index e64ca13d72..b7eadce123 100644 --- a/packages/contracts/src/git.ts +++ b/packages/contracts/src/git.ts @@ -8,6 +8,20 @@ const TrimmedNonEmptyStringSchema = TrimmedNonEmptyString; export const GitStackedAction = Schema.Literals(["commit", "commit_push", "commit_push_pr"]); export type GitStackedAction = typeof GitStackedAction.Type; +export const GitActionProgressPhase = Schema.Literals(["branch", "commit", "push", "pr"]); +export type GitActionProgressPhase = typeof GitActionProgressPhase.Type; +export const GitActionProgressKind = Schema.Literals([ + "action_started", + "phase_started", + "hook_started", + "hook_output", + "hook_finished", + "action_finished", + "action_failed", +]); +export type GitActionProgressKind = typeof GitActionProgressKind.Type; +export const GitActionProgressStream = Schema.Literals(["stdout", "stderr"]); +export type GitActionProgressStream = typeof GitActionProgressStream.Type; const GitCommitStepStatus = Schema.Literals(["created", "skipped_no_changes"]); const GitPushStepStatus = Schema.Literals([ "pushed", @@ -58,6 +72,7 @@ export const GitPullInput = Schema.Struct({ export type GitPullInput = typeof GitPullInput.Type; export const GitRunStackedActionInput = Schema.Struct({ + actionId: TrimmedNonEmptyStringSchema, cwd: TrimmedNonEmptyStringSchema, action: GitStackedAction, commitMessage: Schema.optional(TrimmedNonEmptyStringSchema.check(Schema.isMaxLength(10_000))), @@ -211,3 +226,62 @@ export const GitPullResult = Schema.Struct({ upstreamBranch: TrimmedNonEmptyStringSchema.pipe(Schema.NullOr), }); export type GitPullResult = typeof GitPullResult.Type; + +const GitActionProgressBase = Schema.Struct({ + actionId: TrimmedNonEmptyStringSchema, + cwd: TrimmedNonEmptyStringSchema, + action: GitStackedAction, +}); + +const GitActionStartedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("action_started"), + phases: Schema.Array(GitActionProgressPhase), +}); +const GitActionPhaseStartedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("phase_started"), + phase: GitActionProgressPhase, + label: TrimmedNonEmptyStringSchema, +}); +const GitActionHookStartedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("hook_started"), + hookName: TrimmedNonEmptyStringSchema, +}); +const GitActionHookOutputEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("hook_output"), + hookName: Schema.NullOr(TrimmedNonEmptyStringSchema), + stream: GitActionProgressStream, + text: TrimmedNonEmptyStringSchema, +}); +const GitActionHookFinishedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("hook_finished"), + hookName: TrimmedNonEmptyStringSchema, + exitCode: Schema.NullOr(Schema.Int), + durationMs: Schema.NullOr(NonNegativeInt), +}); +const GitActionFinishedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("action_finished"), + result: GitRunStackedActionResult, +}); +const GitActionFailedEvent = Schema.Struct({ + ...GitActionProgressBase.fields, + kind: Schema.Literal("action_failed"), + phase: Schema.NullOr(GitActionProgressPhase), + message: TrimmedNonEmptyStringSchema, +}); + +export const GitActionProgressEvent = Schema.Union([ + GitActionStartedEvent, + GitActionPhaseStartedEvent, + GitActionHookStartedEvent, + GitActionHookOutputEvent, + GitActionHookFinishedEvent, + GitActionFinishedEvent, + GitActionFailedEvent, +]); +export type GitActionProgressEvent = typeof GitActionProgressEvent.Type; diff --git a/packages/contracts/src/ipc.ts b/packages/contracts/src/ipc.ts index b9127fb176..ea73024de3 100644 --- a/packages/contracts/src/ipc.ts +++ b/packages/contracts/src/ipc.ts @@ -1,5 +1,6 @@ import type { GitCheckoutInput, + GitActionProgressEvent, GitCreateBranchInput, GitPreparePullRequestThreadInput, GitPreparePullRequestThreadResult, @@ -149,6 +150,7 @@ export interface NativeApi { pull: (input: GitPullInput) => Promise; status: (input: GitStatusInput) => Promise; runStackedAction: (input: GitRunStackedActionInput) => Promise; + onActionProgress: (callback: (event: GitActionProgressEvent) => void) => () => void; }; contextMenu: { show: ( diff --git a/packages/contracts/src/ws.test.ts b/packages/contracts/src/ws.test.ts index d732242ecd..2030dad4e5 100644 --- a/packages/contracts/src/ws.test.ts +++ b/packages/contracts/src/ws.test.ts @@ -95,6 +95,30 @@ it.effect("accepts typed websocket push envelopes with sequence", () => }), ); +it.effect("accepts git.actionProgress push envelopes", () => + Effect.gen(function* () { + const parsed = yield* decodeWsResponse({ + type: "push", + sequence: 3, + channel: WS_CHANNELS.gitActionProgress, + data: { + actionId: "action-1", + cwd: "/repo", + action: "commit", + kind: "phase_started", + phase: "commit", + label: "Committing...", + }, + }); + + if (!("type" in parsed) || parsed.type !== "push") { + assert.fail("expected websocket response to decode as a push envelope"); + } + + assert.strictEqual(parsed.channel, WS_CHANNELS.gitActionProgress); + }), +); + it.effect("rejects push envelopes when channel payload does not match the channel schema", () => Effect.gen(function* () { const result = yield* Effect.exit( diff --git a/packages/contracts/src/ws.ts b/packages/contracts/src/ws.ts index ebb76138b8..45ef0512da 100644 --- a/packages/contracts/src/ws.ts +++ b/packages/contracts/src/ws.ts @@ -12,6 +12,7 @@ import { OrchestrationReplayEventsInput, } from "./orchestration"; import { + GitActionProgressEvent, GitCheckoutInput, GitCreateBranchInput, GitPreparePullRequestThreadInput, @@ -80,6 +81,7 @@ export const WS_METHODS = { // ── Push Event Channels ────────────────────────────────────────────── export const WS_CHANNELS = { + gitActionProgress: "git.actionProgress", terminalEvent: "terminal.event", serverWelcome: "server.welcome", serverConfigUpdated: "server.configUpdated", @@ -172,6 +174,7 @@ export type WsWelcomePayload = typeof WsWelcomePayload.Type; export interface WsPushPayloadByChannel { readonly [WS_CHANNELS.serverWelcome]: WsWelcomePayload; readonly [WS_CHANNELS.serverConfigUpdated]: typeof ServerConfigUpdatedPayload.Type; + readonly [WS_CHANNELS.gitActionProgress]: typeof GitActionProgressEvent.Type; readonly [WS_CHANNELS.terminalEvent]: typeof TerminalEvent.Type; readonly [ORCHESTRATION_WS_CHANNELS.domainEvent]: OrchestrationEvent; } @@ -195,6 +198,10 @@ export const WsPushServerConfigUpdated = makeWsPushSchema( WS_CHANNELS.serverConfigUpdated, ServerConfigUpdatedPayload, ); +export const WsPushGitActionProgress = makeWsPushSchema( + WS_CHANNELS.gitActionProgress, + GitActionProgressEvent, +); export const WsPushTerminalEvent = makeWsPushSchema(WS_CHANNELS.terminalEvent, TerminalEvent); export const WsPushOrchestrationDomainEvent = makeWsPushSchema( ORCHESTRATION_WS_CHANNELS.domainEvent, @@ -202,6 +209,7 @@ export const WsPushOrchestrationDomainEvent = makeWsPushSchema( ); export const WsPushChannelSchema = Schema.Literals([ + WS_CHANNELS.gitActionProgress, WS_CHANNELS.serverWelcome, WS_CHANNELS.serverConfigUpdated, WS_CHANNELS.terminalEvent, @@ -212,6 +220,7 @@ export type WsPushChannelSchema = typeof WsPushChannelSchema.Type; export const WsPush = Schema.Union([ WsPushServerWelcome, WsPushServerConfigUpdated, + WsPushGitActionProgress, WsPushTerminalEvent, WsPushOrchestrationDomainEvent, ]);