From 177e7da4f368e4cb1bf1ff3708fc4ee277ffd46c Mon Sep 17 00:00:00 2001 From: Rian Stockbower Date: Tue, 23 Jun 2026 21:16:55 -0400 Subject: [PATCH 1/7] Isolate reviewer LLM task failures --- internal/llm/adapter.go | 58 +- internal/llm/adapter_test.go | 63 +++ internal/pipeline/pipeline.go | 814 ++++++++++++++++++++++++--- internal/pipeline/pipeline_test.go | 246 +++++++- internal/reviewplan/reviewplan.go | 7 + internal/reviewplan/summary.go | 23 +- internal/reviewplan/summary_test.go | 36 ++ internal/reviewrun/reviewrun.go | 12 +- internal/reviewrun/reviewrun_test.go | 56 ++ 9 files changed, 1212 insertions(+), 103 deletions(-) diff --git a/internal/llm/adapter.go b/internal/llm/adapter.go index 13ebb060..be1e6634 100644 --- a/internal/llm/adapter.go +++ b/internal/llm/adapter.go @@ -72,9 +72,40 @@ type Decoder[T any] func([]byte) (T, error) // StructuredResult contains the validated structured value and adapter metadata. type StructuredResult[T any] struct { - Value T - Response Response - SessionID string + Value T + Response Response + SessionID string + ValidationAttempts []StructuredValidationAttempt +} + +// StructuredValidationAttempt records one failed schema-validation attempt. +type StructuredValidationAttempt struct { + Attempt string + SessionID string + Response Response + DecodeError error +} + +// StructuredValidationError carries both invalid structured-output attempts +// when the validation correction retry also fails. +type StructuredValidationError struct { + Attempts []StructuredValidationAttempt +} + +func (e *StructuredValidationError) Error() string { + first, second := "unknown", "unknown" + if len(e.Attempts) > 0 && e.Attempts[0].DecodeError != nil { + first = e.Attempts[0].DecodeError.Error() + } + if len(e.Attempts) > 1 && e.Attempts[1].DecodeError != nil { + second = e.Attempts[1].DecodeError.Error() + } + return fmt.Sprintf("%s: first: %s; second: %s", ErrStructuredOutputInvalidAfterRetry, first, second) +} + +// Is matches ErrStructuredOutputInvalidAfterRetry for errors.Is callers. +func (e *StructuredValidationError) Is(target error) bool { + return target == ErrStructuredOutputInvalidAfterRetry } // RunStructured runs a structured-output request and retries one validation @@ -108,6 +139,12 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter, if decodeErr == nil { return StructuredResult[T]{Value: value, Response: response, SessionID: sessionID}, nil } + attempts := []StructuredValidationAttempt{{ + Attempt: "initial", + SessionID: sessionID, + Response: cloneResponse(response), + DecodeError: decodeErr, + }} retryReq := req retryReq.Prompt = retryPrompt(req.Prompt, decodeErr) @@ -121,9 +158,15 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter, } retryValue, retryErr := decodeStructured(decode, retryResponse.StructuredOutput) if retryErr != nil { - return StructuredResult[T]{Value: zero, Response: retryResponse, SessionID: retrySessionID}, fmt.Errorf("%w: first: %w; second: %w", ErrStructuredOutputInvalidAfterRetry, decodeErr, retryErr) + attempts = append(attempts, StructuredValidationAttempt{ + Attempt: "retry", + SessionID: retrySessionID, + Response: cloneResponse(retryResponse), + DecodeError: retryErr, + }) + return StructuredResult[T]{Value: zero, Response: retryResponse, SessionID: retrySessionID, ValidationAttempts: attempts}, &StructuredValidationError{Attempts: attempts} } - return StructuredResult[T]{Value: retryValue, Response: retryResponse, SessionID: retrySessionID}, nil + return StructuredResult[T]{Value: retryValue, Response: retryResponse, SessionID: retrySessionID, ValidationAttempts: attempts}, nil } // decodeStructured strict-decodes data, then on failure recovers a response @@ -185,6 +228,11 @@ func runOnceAttempt(ctx context.Context, adapter Adapter, resumeSessionID string return stream.SessionID(), response, err } +func cloneResponse(response Response) Response { + response.StructuredOutput = append([]byte(nil), response.StructuredOutput...) + return response +} + func retryPrompt(prompt string, err error) string { return prompt + "\n\nThe previous structured output failed validation: " + validationErrorSummary(err) + "\nReturn corrected JSON only. Do not wrap the JSON in markdown fences, add prose, or include any leading or trailing text." } diff --git a/internal/llm/adapter_test.go b/internal/llm/adapter_test.go index 338e70e9..399fc349 100644 --- a/internal/llm/adapter_test.go +++ b/internal/llm/adapter_test.go @@ -260,6 +260,69 @@ func TestRunStructuredWithSessionResume(t *testing.T) { }) } +func TestRunStructuredValidationAttempts(t *testing.T) { + adapter := &FakeAdapter{SupportsResumeValue: true} + adapter.Queue(FakeResult{SessionID: "initial-session", Response: Response{ + StructuredOutput: []byte(`{"bad":"initial"}`), + Usage: Usage{TokensIn: intPtr(11)}, + }}) + adapter.Queue(FakeResult{SessionID: "retry-session", Response: Response{ + StructuredOutput: []byte(`{"bad":"retry"}`), + Usage: Usage{TokensOut: intPtr(17)}, + }}) + + result, err := RunStructuredWithSessionResume(context.Background(), adapter, "stored-session", Request{Prompt: "prompt"}, func(data []byte) (string, error) { + return "", fmt.Errorf("decode failed for %s", data) + }) + if err == nil { + t.Fatal("RunStructuredWithSessionResume error = nil, want validation error") + } + if !errors.Is(err, ErrStructuredOutputInvalidAfterRetry) { + t.Fatalf("error = %v, want %v", err, ErrStructuredOutputInvalidAfterRetry) + } + var validationErr *StructuredValidationError + if !errors.As(err, &validationErr) { + t.Fatalf("error type = %T, want StructuredValidationError", err) + } + if result.SessionID != "retry-session" { + t.Fatalf("result.SessionID = %q, want retry-session", result.SessionID) + } + assertValidationAttempts(t, result.ValidationAttempts) + assertValidationAttempts(t, validationErr.Attempts) + + resumes := adapter.Resumes() + if len(resumes) != 2 { + t.Fatalf("resumes = %#v, want initial and retry resumes", resumes) + } + if resumes[0].SessionID != "stored-session" || resumes[1].SessionID != "initial-session" { + t.Fatalf("resume sessions = %#v, want stored-session then initial-session", resumes) + } +} + +func assertValidationAttempts(t *testing.T, attempts []StructuredValidationAttempt) { + t.Helper() + if len(attempts) != 2 { + t.Fatalf("attempts = %#v, want two attempts", attempts) + } + want := []struct { + attempt string + session string + raw string + }{ + {attempt: "initial", session: "initial-session", raw: `{"bad":"initial"}`}, + {attempt: "retry", session: "retry-session", raw: `{"bad":"retry"}`}, + } + for i, want := range want { + got := attempts[i] + if got.Attempt != want.attempt || got.SessionID != want.session || string(got.Response.StructuredOutput) != want.raw { + t.Fatalf("attempt[%d] = %#v, want %s/%s/%s", i, got, want.attempt, want.session, want.raw) + } + if got.DecodeError == nil || !strings.Contains(got.DecodeError.Error(), want.raw) { + t.Fatalf("attempt[%d].DecodeError = %v, want raw payload in decode error", i, got.DecodeError) + } + } +} + func TestRunStructuredProseRecovery(t *testing.T) { type probe struct { OK bool `json:"ok"` diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 79f10e74..66004ac6 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -3,6 +3,8 @@ package pipeline import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -14,6 +16,7 @@ import ( "strings" "sync" "time" + "unicode/utf8" "github.com/google/uuid" @@ -62,6 +65,7 @@ type Store interface { DeleteRun(context.Context, string) error AllocateRun(context.Context, ledger.AllocateRunParams) (ledger.Run, error) InsertSession(context.Context, ledger.Session) error + GetSession(context.Context, string) (ledger.Session, error) InsertFinding(context.Context, ledger.Finding) error InsertPlannedAction(context.Context, ledger.PlannedAction) error CompleteRun(context.Context, string, ledger.Outcome, time.Time) error @@ -155,6 +159,7 @@ type ArtifactPaths struct { RollupMarkdown string `json:"rollup_markdown"` AgentSourcesJSON string `json:"agent_sources_json"` AgentLogsDir string `json:"agent_logs_dir"` + LLMTasksDir string `json:"llm_tasks_dir"` } // SlicePatch returns the artifact path for an agent/file diff slice. @@ -176,6 +181,106 @@ func (p ArtifactPaths) AgentLog(agentID string) (string, error) { return filepath.Join(p.AgentLogsDir, statepaths.Encode(agentID)+".jsonl"), nil } +// LLMTaskDir returns the artifact directory for one durable LLM task. +func (p ArtifactPaths) LLMTaskDir(taskID string) (string, error) { + if strings.TrimSpace(taskID) == "" { + return "", fmt.Errorf("pipeline: LLM task ID is required") + } + return filepath.Join(p.LLMTasksDir, statepaths.Encode(taskID)), nil +} + +// LLMTaskMetadata returns the metadata artifact path for one durable LLM task. +func (p ArtifactPaths) LLMTaskMetadata(taskID string) (string, error) { + dir, err := p.LLMTaskDir(taskID) + if err != nil { + return "", err + } + return filepath.Join(dir, "metadata.json"), nil +} + +// LLMTaskValidatedOutput returns the validated structured output path for one task. +func (p ArtifactPaths) LLMTaskValidatedOutput(taskID string) (string, error) { + dir, err := p.LLMTaskDir(taskID) + if err != nil { + return "", err + } + return filepath.Join(dir, "validated-output.json"), nil +} + +// LLMTaskRawAttempt returns the raw structured output path for a failed attempt. +func (p ArtifactPaths) LLMTaskRawAttempt(taskID, attempt string) (string, error) { + dir, err := p.LLMTaskDir(taskID) + if err != nil { + return "", err + } + attempt = strings.TrimSpace(attempt) + if attempt == "" { + return "", fmt.Errorf("pipeline: LLM task attempt is required") + } + return filepath.Join(dir, statepaths.Encode(attempt)+".json"), nil +} + +const llmTaskSchemaVersion = 1 + +type llmTaskStatus string + +const ( + llmTaskStatusSucceeded llmTaskStatus = "succeeded" + llmTaskStatusFailedIsolated llmTaskStatus = "failed_isolated" + llmTaskStatusFailedBlocking llmTaskStatus = "failed_blocking" +) + +type llmTaskMetadata struct { + SchemaVersion int `json:"schema_version"` + TaskID string `json:"task_id"` + Phase string `json:"phase"` + DependencyTaskIDs []string `json:"dependency_task_ids,omitempty"` + InputFingerprint string `json:"input_fingerprint"` + AgentID string `json:"agent_id,omitempty"` + Status llmTaskStatus `json:"status"` + SessionRowID string `json:"session_row_id,omitempty"` + ProviderSessionID string `json:"provider_session_id,omitempty"` + Adapter string `json:"adapter,omitempty"` + Model string `json:"model,omitempty"` + Effort string `json:"effort,omitempty"` + LogPath string `json:"log_path,omitempty"` + ValidatedOutputPath string `json:"validated_output_path,omitempty"` + Error string `json:"error,omitempty"` + Attempts []llmTaskAttemptMetadata `json:"attempts,omitempty"` +} + +type llmTaskAttemptMetadata struct { + Attempt string `json:"attempt"` + ProviderSessionID string `json:"provider_session_id,omitempty"` + RawOutputPath string `json:"raw_output_path,omitempty"` + DecodeError string `json:"decode_error,omitempty"` +} + +var errLLMTaskFailedBlocking = errors.New("pipeline: blocking LLM task failed") + +type llmTaskError struct { + status llmTaskStatus + err error +} + +func (e *llmTaskError) Error() string { + if e == nil || e.err == nil { + return errLLMTaskFailedBlocking.Error() + } + return e.err.Error() +} + +func (e *llmTaskError) Unwrap() error { + if e == nil { + return nil + } + return e.err +} + +func (e *llmTaskError) Is(target error) bool { + return target == errLLMTaskFailedBlocking && e != nil && e.status == llmTaskStatusFailedBlocking +} + // Result is the completed dry-run pipeline output. type Result struct { Run ledger.Run @@ -200,6 +305,15 @@ type Result struct { CurrentHeadSHA string ReviewBaseSHA string ReviewHeadSHA string + ReviewerFailures []ReviewerFailure +} + +// ReviewerFailure records an isolated reviewer LLM task failure that should not +// abort the whole run. +type ReviewerFailure struct { + TaskID string `json:"task_id"` + AgentID string `json:"agent_id"` + Error string `json:"error"` } // SelectionSession describes the single LLM turn used for selection-only execution. @@ -299,6 +413,7 @@ type preparedSelectionContext struct { } type selectionPhaseRequest struct { + RunID string Profile config.Profile SelectionModelOverride string SelectionEffortOverride string @@ -387,7 +502,7 @@ func SelectionOnly(ctx context.Context, opts Options, req SelectionRequest) (Sel return result, nil } - selection, session, err := runSelectionPhase(ctx, opts, selectionPhaseRequest{ + selection, session, _, err := runSelectionPhase(ctx, opts, selectionPhaseRequest{ Profile: req.Profile, SelectionModelOverride: req.SelectionModelOverride, SelectionEffortOverride: req.SelectionEffortOverride, @@ -415,10 +530,11 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) return Result{}, err } completed := false + failureOutcome := ledger.OutcomeFailed if mode.live { defer func() { if !completed && !isContextError(err) { - _ = opts.Store.CompleteRun(context.Background(), mode.run.RunID, ledger.OutcomeFailed, opts.now()) + _ = opts.Store.CompleteRun(context.Background(), mode.run.RunID, failureOutcome, opts.now()) } }() } @@ -456,8 +572,31 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) } result := prepared.reviewResult() + run := mode.run + if !mode.live { + run, err = opts.Store.AllocateRun(ctx, ledger.AllocateRunParams{ + PRKey: prepared.prKey, + PRURL: req.PRURL, + RunID: runID, + SHA: prepared.reviewPR.Head.SHA, + BaseSHA: prepared.reviewPR.Base.SHA, + Profile: req.ProfileName, + PostingIdentity: postingKey(req.PostingIdentity), + PostMode: ledger.PostModeDryRun, + StartedAt: now, + ArtifactPath: prepared.artifacts.Dir, + }) + if err != nil { + return Result{}, err + } + defer func() { + if !completed { + _ = opts.Store.CompleteRun(context.Background(), run.RunID, failureOutcome, opts.now()) + } + }() + } + result.Run = run - var sessionDrafts []sessionDraft findingSession := map[review.FindingID]string{} if len(prepared.parsed.Patches) == 0 { @@ -483,7 +622,8 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) return Result{}, err } - selection, selectionSession, err := runSelectionPhase(ctx, opts, selectionPhaseRequest{ + selection, selectionSession, selectionLedgerSession, err := runSelectionPhase(ctx, opts, selectionPhaseRequest{ + RunID: run.RunID, Profile: req.Profile, SelectionModelOverride: req.SelectionModelOverride, SelectionEffortOverride: req.SelectionEffortOverride, @@ -497,18 +637,26 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) MaxAgents: maxAgents, }) if err != nil { + if errors.Is(err, errLLMTaskFailedBlocking) { + failureOutcome = ledger.OutcomeIncomplete + } return Result{}, err } result.Selection = selection - sessionDrafts = append(sessionDrafts, selectionSession) + result.Sessions = appendSessionIfPresent(result.Sessions, selectionLedgerSession) namedSession.recordSessionID(selectionSession) - findings, reviewerSessions, reviewerFindingSessions, err := runReviewers(ctx, opts, req, prepared.reviewPR, prepared.catalog, prepared.parsed, prepared.artifacts, selection, maxConcurrency) + selectionTaskIDs := []string{orchestratorSelectionStage} + findings, reviewerSessions, reviewerLedgerSessions, reviewerFindingSessions, reviewerFailures, err := runReviewers(ctx, opts, req, run.RunID, prepared.reviewPR, prepared.catalog, prepared.parsed, prepared.artifacts, selection, selectionTaskIDs, maxConcurrency) if err != nil { + if errors.Is(err, errLLMTaskFailedBlocking) { + failureOutcome = ledger.OutcomeIncomplete + } return Result{}, err } result.Findings = findings - sessionDrafts = append(sessionDrafts, reviewerSessions...) + result.ReviewerFailures = reviewerFailures + result.Sessions = appendSessionsIfPresent(result.Sessions, reviewerLedgerSessions...) for id, rowID := range reviewerFindingSessions { findingSession[id] = rowID } @@ -519,7 +667,7 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) } rollupModel, rollupEffort := rollupRuntimeConfig.model, rollupRuntimeConfig.effort - rollupPrompt, err := buildRollupPrompt(prepared.reviewPR, findings) + rollupPrompt, err := buildRollupPrompt(prepared.reviewPR, findings, reviewerFailures) if err != nil { return Result{}, err } @@ -530,71 +678,56 @@ func execute(ctx context.Context, opts Options, req Request, mode executionMode) if err != nil { return Result{}, err } - rollup, rollupSession, err := runStructuredResume(ctx, opts, ledger.SessionRoleOrchestrator, nil, rollupModel, rollupEffort, rollupLog, rollupPrompt, namedSession.resumeID(), func(data []byte) (review.Rollup, error) { + reviewerDeps := reviewerTaskIDs(selection.SelectedAgents) + rollupDeps := append([]string(nil), selectionTaskIDs...) + rollupDeps = append(rollupDeps, reviewerDeps...) + rollup, rollupSession, rollupLedgerSession, err := runStructuredTask(ctx, opts, llmTaskSpec{ + runID: run.RunID, + taskID: orchestratorRollupStage, + phase: "rollup", + dependencyTaskIDs: rollupDeps, + inputFingerprint: llmTaskFingerprint(orchestratorRollupStage, "rollup", rollupModel, rollupEffort, rollupPrompt, rollupDeps), + artifacts: prepared.artifacts, + role: ledger.SessionRoleOrchestrator, + model: rollupModel, + effort: rollupEffort, + logPath: rollupLog, + prompt: rollupPrompt, + resumeSessionID: namedSession.resumeID(), + }, func(data []byte) (review.Rollup, error) { return llm.DecodeRollup(data, llm.RollupOptions{ FindingSeverities: findingSeverities(findings), MajorEventRequestsChanges: req.MajorRequestChanges, }) }) if err != nil { + if errors.Is(err, errLLMTaskFailedBlocking) { + failureOutcome = ledger.OutcomeIncomplete + } return Result{}, err } result.Rollup = rollup - sessionDrafts = append(sessionDrafts, rollupSession) + result.Sessions = appendSessionIfPresent(result.Sessions, rollupLedgerSession) result.NamedSessionCandidate = namedSession.buildCandidate(rollupSession, opts.now()) if namedSession.enabled && result.NamedSessionCandidate == nil { opts.emitWarning(fmt.Sprintf("session %q was not updated because no orchestrator session was produced", namedSession.active.Name)) } plan, err := opts.buildPlan(req, prepared.reviewPR, mode.planPostMode, result.EffectiveCaps, prepared.parsed.PlanDiff, findings, rollup, selection.ThreadActions, false, result.AgentDefsChanged, planRunInputs{ - hasRun: true, - selection: selectionSession, - reviewers: reviewerSessions, - rollup: rollupSession, - selectedAgents: selection.SelectedAgents, - findingSessions: findingSession, - startedAt: now, + hasRun: true, + selection: selectionSession, + reviewers: reviewerSessions, + rollup: rollupSession, + selectedAgents: selection.SelectedAgents, + findingSessions: findingSession, + reviewerFailures: reviewerFailures, + startedAt: now, }) if err != nil { return Result{}, err } result.Plan = plan } - - run := mode.run - if !mode.live { - run, err = opts.Store.AllocateRun(ctx, ledger.AllocateRunParams{ - PRKey: prepared.prKey, - PRURL: req.PRURL, - RunID: runID, - SHA: prepared.reviewPR.Head.SHA, - BaseSHA: prepared.reviewPR.Base.SHA, - Profile: req.ProfileName, - PostingIdentity: postingKey(req.PostingIdentity), - PostMode: ledger.PostModeDryRun, - StartedAt: now, - ArtifactPath: prepared.artifacts.Dir, - }) - if err != nil { - return Result{}, err - } - } - result.Run = run - if !mode.live { - defer func() { - if !completed { - _ = opts.Store.CompleteRun(context.Background(), run.RunID, ledger.OutcomeFailed, opts.now()) - } - }() - } - - for _, draft := range sessionDrafts { - session := draft.toLedger(run.RunID) - if err := opts.Store.InsertSession(ctx, session); err != nil { - return Result{}, err - } - result.Sessions = append(result.Sessions, session) - } for _, finding := range result.Plan.AnchoredFindings { rowID, err := sessionRowIDForFinding(finding, findingSession) if err != nil { @@ -705,6 +838,9 @@ func prepareSelectionContext(ctx context.Context, opts Options, req selectionSet if err := os.MkdirAll(artifacts.AgentLogsDir, 0o700); err != nil { return preparedSelectionContext{}, fmt.Errorf("pipeline: create agent log dir: %w", err) } + if err := os.MkdirAll(artifacts.LLMTasksDir, 0o700); err != nil { + return preparedSelectionContext{}, fmt.Errorf("pipeline: create LLM task dir: %w", err) + } quota, quotaSupported, err := opts.Adapter.Quota(ctx) if err != nil { @@ -784,36 +920,57 @@ func resolveReviewPRContext(ctx context.Context, provider ReadProvider, ref gitp }, nil } -func runSelectionPhase(ctx context.Context, opts Options, req selectionPhaseRequest) (llm.Selection, sessionDraft, error) { +func runSelectionPhase(ctx context.Context, opts Options, req selectionPhaseRequest) (llm.Selection, sessionDraft, ledger.Session, error) { runtimeConfig, err := resolveSelectionRuntimeConfig(req.Profile, req.SelectionModelOverride, req.SelectionEffortOverride) if err != nil { - return llm.Selection{}, sessionDraft{}, err + return llm.Selection{}, sessionDraft{}, ledger.Session{}, err } model, effort := runtimeConfig.model, runtimeConfig.effort selectionPrompt, err := buildSelectionPrompt(req.ReviewPR, req.Catalog, req.ParsedDiff.Patches, req.Threads, req.MaxAgents, req.SelectionPromptInstructions) if err != nil { - return llm.Selection{}, sessionDraft{}, err + return llm.Selection{}, sessionDraft{}, ledger.Session{}, err } if err := opts.checkPromptBudget("selection", "", model, "", selectionPrompt); err != nil { - return llm.Selection{}, sessionDraft{}, err + return llm.Selection{}, sessionDraft{}, ledger.Session{}, err } selectionLog, err := req.Artifacts.AgentLog(orchestratorSelectionStage) if err != nil { - return llm.Selection{}, sessionDraft{}, err + return llm.Selection{}, sessionDraft{}, ledger.Session{}, err } - selection, selectionSession, err := runStructuredResume(ctx, opts, ledger.SessionRoleOrchestrator, nil, model, effort, selectionLog, selectionPrompt, req.ResumeSessionID, func(data []byte) (llm.Selection, error) { + decode := func(data []byte) (llm.Selection, error) { return llm.DecodeSelection(data, llm.SelectionOptions{ KnownAgents: knownAgents(req.Catalog), ChangedFiles: changedFiles(req.ParsedDiff.Patches), KnownThreads: knownThreads(req.Threads), }) - }) + } + if strings.TrimSpace(req.RunID) != "" { + selection, selectionSession, ledgerSession, err := runStructuredTask(ctx, opts, llmTaskSpec{ + runID: req.RunID, + taskID: orchestratorSelectionStage, + phase: "selection", + inputFingerprint: llmTaskFingerprint(orchestratorSelectionStage, "selection", model, effort, selectionPrompt, nil), + artifacts: req.Artifacts, + role: ledger.SessionRoleOrchestrator, + model: model, + effort: effort, + logPath: selectionLog, + prompt: selectionPrompt, + resumeSessionID: req.ResumeSessionID, + }, decode) + if err != nil { + return llm.Selection{}, selectionSession, ledgerSession, err + } + selection = opts.capSelectionAgents(selection, req.MaxAgents) + return selection, selectionSession, ledgerSession, nil + } + selection, selectionSession, err := runStructuredResume(ctx, opts, ledger.SessionRoleOrchestrator, nil, model, effort, selectionLog, selectionPrompt, req.ResumeSessionID, decode) if err != nil { - return llm.Selection{}, selectionSession, err + return llm.Selection{}, selectionSession, ledger.Session{}, err } selection = opts.capSelectionAgents(selection, req.MaxAgents) - return selection, selectionSession, nil + return selection, selectionSession, ledger.Session{}, nil } func (opts Options) capSelectionAgents(selection llm.Selection, maxAgents int) llm.Selection { @@ -855,7 +1012,21 @@ func sessionRowIDForFinding(finding reviewplan.AnchoredFinding, findingSession m return rowID, nil } -func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider.PR, catalog agents.Catalog, parsed ParsedDiff, artifacts ArtifactPaths, selection llm.Selection, maxConcurrency int) ([]review.Finding, []sessionDraft, map[review.FindingID]string, error) { +func appendSessionIfPresent(sessions []ledger.Session, session ledger.Session) []ledger.Session { + if strings.TrimSpace(session.SessionRowID) == "" { + return sessions + } + return append(sessions, session) +} + +func appendSessionsIfPresent(sessions []ledger.Session, more ...ledger.Session) []ledger.Session { + for _, session := range more { + sessions = appendSessionIfPresent(sessions, session) + } + return sessions +} + +func runReviewers(ctx context.Context, opts Options, req Request, runID string, pr gitprovider.PR, catalog agents.Catalog, parsed ParsedDiff, artifacts ArtifactPaths, selection llm.Selection, dependencyTaskIDs []string, maxConcurrency int) ([]review.Finding, []sessionDraft, []ledger.Session, map[review.FindingID]string, []ReviewerFailure, error) { type job struct { selected llm.SelectedAgent agent agents.Agent @@ -864,12 +1035,12 @@ func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider for _, selected := range selection.SelectedAgents { agent, ok := catalog.Find(selected.AgentID) if !ok { - return nil, nil, nil, fmt.Errorf("pipeline: selected agent %q not found", selected.AgentID) + return nil, nil, nil, nil, nil, fmt.Errorf("pipeline: selected agent %q not found", selected.AgentID) } jobs = append(jobs, job{selected: selected, agent: agent}) } if len(jobs) == 0 { - return nil, nil, nil, nil + return nil, nil, nil, nil, nil, nil } sort.Slice(jobs, func(i, j int) bool { return jobs[i].agent.ID < jobs[j].agent.ID }) @@ -879,7 +1050,9 @@ func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider var mu sync.Mutex var allFindings []review.Finding var sessions []sessionDraft + var ledgerSessions []ledger.Session findingSessions := map[review.FindingID]string{} + var failures []ReviewerFailure var firstErr error var wg sync.WaitGroup for _, current := range jobs { @@ -893,9 +1066,15 @@ func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider return } defer func() { <-sem }() - findings, session, err := runReviewer(reviewCtx, opts, req, pr, parsed, artifacts, current.selected, current.agent) + findings, session, ledgerSession, failure, err := runReviewer(reviewCtx, opts, req, runID, pr, parsed, artifacts, current.selected, current.agent, dependencyTaskIDs) mu.Lock() defer mu.Unlock() + if failure != nil { + failures = append(failures, *failure) + sessions = append(sessions, session) + ledgerSessions = appendSessionIfPresent(ledgerSessions, ledgerSession) + return + } if err != nil { if firstErr == nil { firstErr = err @@ -904,6 +1083,7 @@ func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider return } sessions = append(sessions, session) + ledgerSessions = appendSessionIfPresent(ledgerSessions, ledgerSession) for _, finding := range findings { allFindings = append(allFindings, finding) findingSessions[finding.ID] = session.rowID @@ -912,31 +1092,47 @@ func runReviewers(ctx context.Context, opts Options, req Request, pr gitprovider } wg.Wait() if firstErr != nil { - return nil, nil, nil, firstErr + return nil, nil, nil, nil, nil, firstErr } sort.Slice(allFindings, func(i, j int) bool { return allFindings[i].ID < allFindings[j].ID }) - return allFindings, sessions, findingSessions, nil + sort.Slice(failures, func(i, j int) bool { return failures[i].AgentID < failures[j].AgentID }) + return allFindings, sessions, ledgerSessions, findingSessions, failures, nil } -func runReviewer(ctx context.Context, opts Options, req Request, pr gitprovider.PR, parsed ParsedDiff, artifacts ArtifactPaths, selected llm.SelectedAgent, agent agents.Agent) ([]review.Finding, sessionDraft, error) { +func runReviewer(ctx context.Context, opts Options, req Request, runID string, pr gitprovider.PR, parsed ParsedDiff, artifacts ArtifactPaths, selected llm.SelectedAgent, agent agents.Agent, dependencyTaskIDs []string) ([]review.Finding, sessionDraft, ledger.Session, *ReviewerFailure, error) { runtimeConfig, err := resolveReviewerRuntimeConfig(req, agent) if err != nil { - return nil, sessionDraft{}, err + return nil, sessionDraft{}, ledger.Session{}, nil, err } model, effort := runtimeConfig.model, runtimeConfig.effort prompt, err := buildReviewerPrompt(ctx, opts, req, pr, parsed, selected, agent, model) if err != nil { - return nil, sessionDraft{}, err + return nil, sessionDraft{}, ledger.Session{}, nil, err } if err := opts.checkPromptBudget("reviewer", agent.ID, model, strings.Join(selected.Files, ","), prompt); err != nil { - return nil, sessionDraft{}, err + return nil, sessionDraft{}, ledger.Session{}, nil, err } logPath, err := artifacts.AgentLog(agent.ID) if err != nil { - return nil, sessionDraft{}, err + return nil, sessionDraft{}, ledger.Session{}, nil, err } agentID := agent.ID - findings, session, err := runStructured(ctx, opts, ledger.SessionRoleReviewer, &agentID, model, effort, logPath, prompt, func(data []byte) (llm.Findings, error) { + taskID := reviewerTaskID(agent.ID) + findings, session, ledgerSession, err := runStructuredTask(ctx, opts, llmTaskSpec{ + runID: runID, + taskID: taskID, + phase: "reviewer", + dependencyTaskIDs: dependencyTaskIDs, + inputFingerprint: llmTaskFingerprint(taskID, "reviewer", model, effort, prompt, dependencyTaskIDs), + artifacts: artifacts, + role: ledger.SessionRoleReviewer, + agentID: &agentID, + model: model, + effort: effort, + logPath: logPath, + prompt: prompt, + validationFailure: llmTaskStatusFailedIsolated, + }, func(data []byte) (llm.Findings, error) { return llm.DecodeFindings(data, llm.FindingsOptions{ KnownAgents: map[string]bool{agent.ID: true}, ChangedFiles: changedFiles(parsed.Patches), @@ -944,13 +1140,30 @@ func runReviewer(ctx context.Context, opts Options, req Request, pr gitprovider. }) }) if err != nil { - return nil, sessionDraft{}, err + var taskErr *llmTaskError + if errors.As(err, &taskErr) && taskErr.status == llmTaskStatusFailedIsolated { + return nil, session, ledgerSession, &ReviewerFailure{ + TaskID: taskID, + AgentID: agent.ID, + Error: sanitizeTaskError(err), + }, nil + } + return nil, sessionDraft{}, ledger.Session{}, nil, err } - return findings.Findings, session, nil + return findings.Findings, session, ledgerSession, nil, nil } -func runStructured[T any](ctx context.Context, opts Options, role ledger.SessionRole, agentID *string, model, effort, logPath, prompt string, decode llm.Decoder[T]) (T, sessionDraft, error) { - return runStructuredResume(ctx, opts, role, agentID, model, effort, logPath, prompt, "", decode) +func reviewerTaskID(agentID string) string { + return "reviewer-" + statepaths.Encode(agentID) +} + +func reviewerTaskIDs(selected []llm.SelectedAgent) []string { + out := make([]string, 0, len(selected)) + for _, agent := range selected { + out = append(out, reviewerTaskID(agent.AgentID)) + } + sort.Strings(out) + return out } func runStructuredResume[T any](ctx context.Context, opts Options, role ledger.SessionRole, agentID *string, model, effort, logPath, prompt, resumeSessionID string, decode llm.Decoder[T]) (T, sessionDraft, error) { @@ -984,6 +1197,377 @@ func runStructuredResume[T any](ctx context.Context, opts Options, role ledger.S return result.Value, draft, err } +type llmTaskSpec struct { + runID string + taskID string + phase string + dependencyTaskIDs []string + inputFingerprint string + artifacts ArtifactPaths + role ledger.SessionRole + agentID *string + model string + effort string + logPath string + prompt string + resumeSessionID string + validationFailure llmTaskStatus +} + +func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpec, decode llm.Decoder[T]) (T, sessionDraft, ledger.Session, error) { + if loaded, draft, session, ok, err := loadStructuredTask(ctx, opts, spec, decode); err != nil || ok { + return loaded, draft, session, err + } + resumeSessionID := spec.resumeSessionID + if meta, ok, err := readLLMTaskMetadata(spec.artifacts, spec.taskID); err != nil { + var zero T + return zero, sessionDraft{}, ledger.Session{}, err + } else if ok && strings.TrimSpace(resumeSessionID) == "" && meta.Status == llmTaskStatusFailedBlocking { + resumeSessionID = taskResumeSessionID(meta) + } + + started := opts.now() + result, err := llm.RunStructuredWithSessionResume(ctx, opts.Adapter, resumeSessionID, llm.Request{ + Model: spec.model, + Effort: spec.effort, + Prompt: spec.prompt, + LogPath: spec.logPath, + }, decode) + completed := opts.now() + draft := sessionDraft{ + rowID: opts.newSessionRowID(), + providerReportedSessionID: result.SessionID, + providerSessionID: result.SessionID, + role: spec.role, + agentID: spec.agentID, + adapter: opts.Adapter.Name(), + model: spec.model, + effort: spec.effort, + startedAt: started, + completedAt: completed, + response: result.Response, + } + if strings.TrimSpace(draft.providerSessionID) == "" && err == nil { + draft.providerSessionID = draft.rowID + } + if strings.TrimSpace(draft.model) == "" { + draft.model = "default" + } + + meta := baseLLMTaskMetadata(opts, spec, draft) + var session ledger.Session + if strings.TrimSpace(draft.providerSessionID) != "" { + session = draft.toLedger(spec.runID) + if err := opts.Store.InsertSession(ctx, session); err != nil { + var zero T + return zero, sessionDraft{}, ledger.Session{}, err + } + } + + if err == nil { + meta.Status = llmTaskStatusSucceeded + meta.SessionRowID = session.SessionRowID + meta.ProviderSessionID = session.ProviderSessionID + if writeErr := writeLLMTaskSuccess(spec.artifacts, &meta, result.Response.StructuredOutput); writeErr != nil { + var zero T + return zero, sessionDraft{}, ledger.Session{}, writeErr + } + return result.Value, draft, session, nil + } + + meta.Error = sanitizeTaskError(err) + meta.Status = llmTaskStatusFailedBlocking + if spec.validationFailure != "" && errors.Is(err, llm.ErrStructuredOutputInvalidAfterRetry) { + meta.Status = spec.validationFailure + } + meta.SessionRowID = session.SessionRowID + meta.ProviderSessionID = session.ProviderSessionID + if writeErr := writeLLMTaskFailure(spec.artifacts, &meta, result.ValidationAttempts); writeErr != nil { + var zero T + return zero, draft, session, writeErr + } + var zero T + return zero, draft, session, &llmTaskError{status: meta.Status, err: err} +} + +func loadStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpec, decode llm.Decoder[T]) (T, sessionDraft, ledger.Session, bool, error) { + var zero T + meta, ok, err := readLLMTaskMetadata(spec.artifacts, spec.taskID) + if err != nil || !ok { + return zero, sessionDraft{}, ledger.Session{}, ok, err + } + if err := validateLLMTaskMetadata(meta, spec); err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, err + } + switch meta.Status { + case llmTaskStatusSucceeded: + outputPath, err := spec.artifacts.LLMTaskValidatedOutput(spec.taskID) + if err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, err + } + if strings.TrimSpace(meta.ValidatedOutputPath) != "" { + outputPath = meta.ValidatedOutputPath + } + data, err := os.ReadFile(outputPath) // #nosec G304 -- validated task output path is scoped to run artifacts. + if err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, fmt.Errorf("pipeline: read LLM task %q output: %w", spec.taskID, err) + } + value, err := decode(data) + if err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, fmt.Errorf("pipeline: decode stored LLM task %q output: %w", spec.taskID, err) + } + session, err := loadTaskSession(ctx, opts, spec.runID, meta) + if err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, err + } + return value, sessionDraftFromLedger(session), session, true, nil + case llmTaskStatusFailedIsolated: + if spec.validationFailure != llmTaskStatusFailedIsolated { + return zero, sessionDraft{}, ledger.Session{}, true, fmt.Errorf("pipeline: LLM task %q has isolated failure status outside reviewer phase", spec.taskID) + } + session, draft, err := loadOptionalTaskSession(ctx, opts, spec.runID, meta) + if err != nil { + return zero, sessionDraft{}, ledger.Session{}, true, err + } + return zero, draft, session, true, &llmTaskError{status: llmTaskStatusFailedIsolated, err: errors.New(taskErrorText(meta))} + case llmTaskStatusFailedBlocking: + return zero, sessionDraft{}, ledger.Session{}, false, nil + default: + return zero, sessionDraft{}, ledger.Session{}, true, fmt.Errorf("pipeline: LLM task %q has unknown status %q", spec.taskID, meta.Status) + } +} + +func validateLLMTaskMetadata(meta llmTaskMetadata, spec llmTaskSpec) error { + if meta.SchemaVersion != llmTaskSchemaVersion { + return fmt.Errorf("pipeline: LLM task %q schema version = %d, want %d", spec.taskID, meta.SchemaVersion, llmTaskSchemaVersion) + } + if meta.TaskID != spec.taskID { + return fmt.Errorf("pipeline: LLM task metadata ID %q does not match %q", meta.TaskID, spec.taskID) + } + if meta.Phase != spec.phase { + return fmt.Errorf("pipeline: LLM task %q phase = %q, want %q", spec.taskID, meta.Phase, spec.phase) + } + fingerprint := strings.TrimSpace(spec.inputFingerprint) + if fingerprint == "" { + fingerprint = llmTaskFingerprint(spec.taskID, spec.phase, spec.model, spec.effort, spec.prompt, spec.dependencyTaskIDs) + } + if meta.InputFingerprint != fingerprint { + return fmt.Errorf("pipeline: LLM task %q input fingerprint changed; pass --rerun to start a fresh review", spec.taskID) + } + return nil +} + +func readLLMTaskMetadata(paths ArtifactPaths, taskID string) (llmTaskMetadata, bool, error) { + path, err := paths.LLMTaskMetadata(taskID) + if err != nil { + return llmTaskMetadata{}, false, err + } + data, err := os.ReadFile(path) // #nosec G304 -- metadata path is derived from run artifact paths. + if errors.Is(err, os.ErrNotExist) { + return llmTaskMetadata{}, false, nil + } + if err != nil { + return llmTaskMetadata{}, false, fmt.Errorf("pipeline: read LLM task %q metadata: %w", taskID, err) + } + var meta llmTaskMetadata + if err := json.Unmarshal(data, &meta); err != nil { + return llmTaskMetadata{}, false, fmt.Errorf("pipeline: decode LLM task %q metadata: %w", taskID, err) + } + return meta, true, nil +} + +func loadTaskSession(ctx context.Context, opts Options, runID string, meta llmTaskMetadata) (ledger.Session, error) { + if strings.TrimSpace(meta.SessionRowID) == "" { + return ledger.Session{}, fmt.Errorf("pipeline: LLM task %q is missing session row id", meta.TaskID) + } + session, err := opts.Store.GetSession(ctx, meta.SessionRowID) + if err != nil { + return ledger.Session{}, fmt.Errorf("pipeline: load LLM task %q session %q: %w", meta.TaskID, meta.SessionRowID, err) + } + if session.RunID != runID { + return ledger.Session{}, fmt.Errorf("pipeline: LLM task %q session belongs to run %q, want %q", meta.TaskID, session.RunID, runID) + } + return session, nil +} + +func loadOptionalTaskSession(ctx context.Context, opts Options, runID string, meta llmTaskMetadata) (ledger.Session, sessionDraft, error) { + if strings.TrimSpace(meta.SessionRowID) == "" { + return ledger.Session{}, sessionDraft{}, nil + } + session, err := loadTaskSession(ctx, opts, runID, meta) + if err != nil { + return ledger.Session{}, sessionDraft{}, err + } + return session, sessionDraftFromLedger(session), nil +} + +func sessionDraftFromLedger(session ledger.Session) sessionDraft { + completedAt := session.StartedAt + if session.CompletedAt != nil { + completedAt = *session.CompletedAt + } + draft := sessionDraft{ + rowID: session.SessionRowID, + providerReportedSessionID: session.ProviderSessionID, + providerSessionID: session.ProviderSessionID, + role: session.Role, + agentID: session.AgentID, + adapter: session.Adapter, + model: session.Model, + startedAt: session.StartedAt, + completedAt: completedAt, + response: llm.Response{ + Usage: llm.Usage{ + TokensIn: int64PtrToInt(session.TokensIn), + TokensOut: int64PtrToInt(session.TokensOut), + CacheRead: int64PtrToInt(session.CacheRead), + CacheCreate: int64PtrToInt(session.CacheCreate), + CostUSD: session.CostUSD, + }, + }, + } + if session.DurationMS != nil { + draft.response.DurationMS = *session.DurationMS + } + if session.Effort != nil { + draft.effort = *session.Effort + } + return draft +} + +func taskResumeSessionID(meta llmTaskMetadata) string { + if strings.TrimSpace(meta.ProviderSessionID) != "" { + return meta.ProviderSessionID + } + for i := len(meta.Attempts) - 1; i >= 0; i-- { + if strings.TrimSpace(meta.Attempts[i].ProviderSessionID) != "" { + return meta.Attempts[i].ProviderSessionID + } + } + return "" +} + +func taskErrorText(meta llmTaskMetadata) string { + if strings.TrimSpace(meta.Error) != "" { + return meta.Error + } + return fmt.Sprintf("LLM task %q failed", meta.TaskID) +} + +func baseLLMTaskMetadata(opts Options, spec llmTaskSpec, draft sessionDraft) llmTaskMetadata { + agentID := "" + if spec.agentID != nil { + agentID = *spec.agentID + } + fingerprint := strings.TrimSpace(spec.inputFingerprint) + if fingerprint == "" { + fingerprint = llmTaskFingerprint(spec.taskID, spec.phase, spec.model, spec.effort, spec.prompt, spec.dependencyTaskIDs) + } + return llmTaskMetadata{ + SchemaVersion: llmTaskSchemaVersion, + TaskID: spec.taskID, + Phase: spec.phase, + DependencyTaskIDs: append([]string(nil), spec.dependencyTaskIDs...), + InputFingerprint: fingerprint, + AgentID: agentID, + SessionRowID: draft.rowID, + ProviderSessionID: draft.providerSessionID, + Adapter: opts.Adapter.Name(), + Model: draft.model, + Effort: draft.effort, + LogPath: spec.logPath, + } +} + +func llmTaskFingerprint(taskID, phase, model, effort, prompt string, deps []string) string { + hash := sha256.New() + for _, part := range []string{ + fmt.Sprintf("schema=%d", llmTaskSchemaVersion), + "task=" + taskID, + "phase=" + phase, + "model=" + model, + "effort=" + effort, + "prompt=" + prompt, + } { + _, _ = io.WriteString(hash, part) + _, _ = io.WriteString(hash, "\n") + } + for _, dep := range deps { + _, _ = io.WriteString(hash, "dep="+dep+"\n") + } + return hex.EncodeToString(hash.Sum(nil)) +} + +func writeLLMTaskSuccess(paths ArtifactPaths, meta *llmTaskMetadata, output []byte) error { + outputPath, err := paths.LLMTaskValidatedOutput(meta.TaskID) + if err != nil { + return err + } + if err := writeFileAtomic(outputPath, append(append([]byte(nil), output...), '\n')); err != nil { + return err + } + meta.ValidatedOutputPath = outputPath + return writeLLMTaskMetadata(paths, *meta) +} + +func writeLLMTaskFailure(paths ArtifactPaths, meta *llmTaskMetadata, attempts []llm.StructuredValidationAttempt) error { + for _, attempt := range attempts { + attemptMeta := llmTaskAttemptMetadata{ + Attempt: attempt.Attempt, + ProviderSessionID: attempt.SessionID, + DecodeError: sanitizeTaskError(attempt.DecodeError), + } + if len(attempt.Response.StructuredOutput) > 0 { + rawPath, err := paths.LLMTaskRawAttempt(meta.TaskID, attempt.Attempt) + if err != nil { + return err + } + if err := writeFileAtomic(rawPath, append(append([]byte(nil), attempt.Response.StructuredOutput...), '\n')); err != nil { + return err + } + attemptMeta.RawOutputPath = rawPath + } + meta.Attempts = append(meta.Attempts, attemptMeta) + } + return writeLLMTaskMetadata(paths, *meta) +} + +func writeLLMTaskMetadata(paths ArtifactPaths, meta llmTaskMetadata) error { + path, err := paths.LLMTaskMetadata(meta.TaskID) + if err != nil { + return err + } + data, err := json.MarshalIndent(meta, "", " ") + if err != nil { + return err + } + return writeFileAtomic(path, append(data, '\n')) +} + +func writeFileAtomic(path string, data []byte) error { + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + return err + } + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, 0o600); err != nil { + return err + } + return os.Rename(tmp, path) +} + +func sanitizeTaskError(err error) string { + if err == nil { + return "" + } + value := strings.Join(strings.Fields(strings.TrimSpace(err.Error())), " ") + value = strings.ReplaceAll(value, "", + }}, + } + plan, err := Build(req) + if err != nil { + t.Fatalf("Build: %v", err) + } + if plan.Outcome != OutcomeComment { + t.Fatalf("outcome = %q, want comment", plan.Outcome) + } + md := plan.RollupMarkdown + for _, want := range []string{ + "### Reviewer Diagnostics", + "| go:implementation-tests | failed | invalid <json> <!-- codereview:run-id=x --> |", + } { + if !strings.Contains(md, want) { + t.Fatalf("rollup missing %q:\n%s", want, md) + } + } + if strings.Contains(md, "") { + t.Fatalf("diagnostic marker was not neutralized:\n%s", md) + } + }) + t.Run("rollup marker placement unchanged with run summary", func(t *testing.T) { for _, mode := range []PostMode{PostModeLive, PostModeDryRun} { req := summaryRequest() diff --git a/internal/reviewrun/reviewrun.go b/internal/reviewrun/reviewrun.go index 6515788c..32bed019 100644 --- a/internal/reviewrun/reviewrun.go +++ b/internal/reviewrun/reviewrun.go @@ -308,10 +308,16 @@ func planOrResume(ctx context.Context, opts Options, req Request, result Result) return "", nil, err } if !empty { - if completeErr := opts.Store.CompleteRun(ctx, result.Run.RunID, ledger.OutcomeFailed, opts.now()); completeErr != nil { - return "", nil, completeErr + hasTasks, err := pipeline.HasLLMTaskMetadata(result.Run.ArtifactPath) + if err != nil { + return "", nil, err + } + if !hasTasks { + if completeErr := opts.Store.CompleteRun(ctx, result.Run.RunID, ledger.OutcomeFailed, opts.now()); completeErr != nil { + return "", nil, completeErr + } + return "", nil, fmt.Errorf("reviewrun: run %s has partial planning state without planned actions; pass --rerun to start a fresh review", result.Run.RunID) } - return "", nil, fmt.Errorf("reviewrun: run %s has partial planning state without planned actions; pass --rerun to start a fresh review", result.Run.RunID) } } diff --git a/internal/reviewrun/reviewrun_test.go b/internal/reviewrun/reviewrun_test.go index aa698692..8aae2937 100644 --- a/internal/reviewrun/reviewrun_test.go +++ b/internal/reviewrun/reviewrun_test.go @@ -371,6 +371,44 @@ func TestRunResumePartialPlanningStateFailsWithoutReplayingLLM(t *testing.T) { } } +func TestRunResumePartialPlanningStateWithLLMTaskMetadataCallsPlanner(t *testing.T) { + ctx := context.Background() + fixture := newFixture(t) + run := fixture.allocateRun(t, "partial-task-plan", testBaseSHA) + if err := fixture.store.InsertSession(ctx, ledger.Session{ + SessionRowID: "session-1", + RunID: run.RunID, + ProviderSessionID: "provider-session", + Role: ledger.SessionRoleOrchestrator, + Adapter: "fake", + Model: "model", + StartedAt: testNow(), + }); err != nil { + t.Fatalf("InsertSession: %v", err) + } + writeTaskMetadata(t, run.ArtifactPath, "orchestrator-selection", map[string]any{ + "schema_version": 1, + "task_id": "orchestrator-selection", + "phase": "selection", + "input_fingerprint": "fingerprint", + "status": "succeeded", + "session_row_id": "session-1", + "provider_session_id": "provider-session", + }) + planner := &fakePlanner{store: fixture.store, outcome: reviewplan.OutcomeComment} + + result, err := Run(ctx, fixture.opts(planner), Request{Pipeline: fixture.req}) + if err != nil { + t.Fatalf("Run: %v", err) + } + if planner.calls != 1 || len(planner.runs) != 1 || planner.runs[0].RunID != run.RunID { + t.Fatalf("planner calls/runs = %d %#v, want one resume of existing run", planner.calls, planner.runs) + } + if result.Outbox.Outcome != ledger.OutcomeComment { + t.Fatalf("outbox outcome = %q, want comment", result.Outbox.Outcome) + } +} + func TestRunResumeEmptyPlanningStateReplansSameRun(t *testing.T) { ctx := context.Background() fixture := newFixture(t) @@ -775,6 +813,24 @@ func (f *fixture) insertReviewActions(t *testing.T, runID string, event review.R } } +func writeTaskMetadata(t *testing.T, artifactPath, taskID string, metadata map[string]any) { + t.Helper() + path, err := pipeline.ArtifactPathsFromDir(artifactPath).LLMTaskMetadata(taskID) + if err != nil { + t.Fatalf("LLMTaskMetadata: %v", err) + } + if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { + t.Fatalf("MkdirAll(%s): %v", filepath.Dir(path), err) + } + data, err := json.Marshal(metadata) + if err != nil { + t.Fatalf("Marshal task metadata: %v", err) + } + if err := os.WriteFile(path, data, 0o600); err != nil { + t.Fatalf("WriteFile(%s): %v", path, err) + } +} + func setPartialRollup(t *testing.T, f *fixture, runID string, outcome string) { t.Helper() body, err := marker.RenderAction(marker.ActionMarker{ From 0c3510d3316855fe1190061faba944d8acb2c918 Mon Sep 17 00:00:00 2001 From: Rian Stockbower Date: Tue, 23 Jun 2026 21:25:42 -0400 Subject: [PATCH 2/7] Document LLM task artifact contract --- docs/development.md | 4 ++ docs/llm-task-artifacts.md | 84 +++++++++++++++++++++++++++++ internal/llm/adapter.go | 6 +-- internal/llm/adapter_test.go | 10 ++-- internal/pipeline/pipeline.go | 76 +++++++++++++------------- internal/reviewplan/summary.go | 6 +-- internal/reviewplan/summary_test.go | 4 +- 7 files changed, 139 insertions(+), 51 deletions(-) create mode 100644 docs/llm-task-artifacts.md diff --git a/docs/development.md b/docs/development.md index 7470c288..a273ffb2 100644 --- a/docs/development.md +++ b/docs/development.md @@ -22,6 +22,10 @@ Within `internal/pipeline`, the public entry points are `DryRun`, `Live`, and and no ledger or posting side effects so benchmark tooling can reuse the real selector implementation. +Structured LLM calls in the review pipeline are durable per-task units. See +`docs/llm-task-artifacts.md` for the artifact schema, status taxonomy, and +resume invariants. + ## Quick Commands ```bash diff --git a/docs/llm-task-artifacts.md b/docs/llm-task-artifacts.md new file mode 100644 index 00000000..a9b9a3ed --- /dev/null +++ b/docs/llm-task-artifacts.md @@ -0,0 +1,84 @@ +# LLM Task Artifacts + +`cr review` treats each structured LLM call as a durable task. Selection, +reviewer, and rollup calls must be isolated from each other so one failed task +does not erase successful upstream work or force unrelated LLM sessions to run +again. + +Task artifacts live under a run artifact directory: + +```text +llm-tasks// + metadata.json + validated-output.json + initial.json + retry.json +``` + +`metadata.json` is the commit marker. Writers must publish it last, after any +validated output or raw failed-attempt payloads are written and after the ledger +session row exists when a provider session is available. Resume code must only +trust the final `metadata.json` name, never a temporary metadata file. + +## Schema Version + +`schema_version` is currently `1`. Bump it when changing any load-bearing field, +status value, fingerprint input, task identity, or resume rule in a way that +could make an in-flight run unsafe to resume. + +Load-bearing metadata fields are: + +- `task_id`: stable task identity. Current values are `orchestrator-selection`, + `reviewer-`, and `orchestrator-rollup`. +- `phase`: task phase, such as `selection`, `reviewer`, or `rollup`. +- `dependency_task_ids`: task IDs whose completed state was included in this + task input. +- `input_fingerprint`: hash of the task schema version, task identity, phase, + model/effort, prompt, and dependency task IDs. +- `agent_id`: reviewer agent ID for reviewer tasks. +- `status`: one of `succeeded`, `failed_isolated`, or `failed_blocking`. +- `session_row_id` and `provider_session_id`: ledger/provider session handles + used for run summaries and provider-level resume. +- `adapter`, `model`, `effort`, and `log_path`: execution context. +- `validated_output_path`: structured output to decode when reusing a succeeded + task. +- `error`: sanitized diagnostic for failed tasks. +- `attempts`: failed validation attempts with attempt label, provider session + ID, raw output path when present, and decode error. + +## Status Semantics + +`succeeded` means the task produced validated structured output. Resume may +reuse the output only when the metadata schema and input fingerprint still match +the current task. + +`failed_isolated` is for reviewer-local structured validation failures. The +failed reviewer is treated as dependency-satisfied for downstream rollup, and +the rollup receives a diagnostic. Sibling reviewers continue to run. A review +with any isolated reviewer failure must not approve; the final event is clamped +to at least `comment`. + +`failed_blocking` means the task prevents dependent phases from safely running. +Selection and rollup failures are blocking. Once a run exists, blocking LLM task +failures leave the run `incomplete` so the normal resume gate can rerun only the +failed task and downstream work. + +Provider start/wait failures may have empty `attempts` because no structured +output existed. When a provider session ID is known, retry should seed the next +task call with that session if the adapter supports resume. + +## Resume Rules + +Resume starts at the first task that cannot be reused: + +- Load a matching `succeeded` selection task instead of rerunning selection. +- Load matching `succeeded` reviewer tasks instead of rerunning reviewers. +- Load `failed_isolated` reviewer diagnostics instead of rerunning those + reviewers automatically. +- Rerun `failed_blocking` tasks and downstream phases. +- Fail with rerun guidance when metadata is missing required payloads, points to + a missing ledger session, has the wrong schema version, or has a stale input + fingerprint. + +Raw invalid structured output is local artifact data. Public rollups may include +concise diagnostics, but they must not include raw failed model output. diff --git a/internal/llm/adapter.go b/internal/llm/adapter.go index be1e6634..fc5a15e2 100644 --- a/internal/llm/adapter.go +++ b/internal/llm/adapter.go @@ -80,7 +80,7 @@ type StructuredResult[T any] struct { // StructuredValidationAttempt records one failed schema-validation attempt. type StructuredValidationAttempt struct { - Attempt string + Label string SessionID string Response Response DecodeError error @@ -140,7 +140,7 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter, return StructuredResult[T]{Value: value, Response: response, SessionID: sessionID}, nil } attempts := []StructuredValidationAttempt{{ - Attempt: "initial", + Label: "initial", SessionID: sessionID, Response: cloneResponse(response), DecodeError: decodeErr, @@ -159,7 +159,7 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter, retryValue, retryErr := decodeStructured(decode, retryResponse.StructuredOutput) if retryErr != nil { attempts = append(attempts, StructuredValidationAttempt{ - Attempt: "retry", + Label: "retry", SessionID: retrySessionID, Response: cloneResponse(retryResponse), DecodeError: retryErr, diff --git a/internal/llm/adapter_test.go b/internal/llm/adapter_test.go index 399fc349..84ec5ec8 100644 --- a/internal/llm/adapter_test.go +++ b/internal/llm/adapter_test.go @@ -305,17 +305,17 @@ func assertValidationAttempts(t *testing.T, attempts []StructuredValidationAttem t.Fatalf("attempts = %#v, want two attempts", attempts) } want := []struct { - attempt string + label string session string raw string }{ - {attempt: "initial", session: "initial-session", raw: `{"bad":"initial"}`}, - {attempt: "retry", session: "retry-session", raw: `{"bad":"retry"}`}, + {label: "initial", session: "initial-session", raw: `{"bad":"initial"}`}, + {label: "retry", session: "retry-session", raw: `{"bad":"retry"}`}, } for i, want := range want { got := attempts[i] - if got.Attempt != want.attempt || got.SessionID != want.session || string(got.Response.StructuredOutput) != want.raw { - t.Fatalf("attempt[%d] = %#v, want %s/%s/%s", i, got, want.attempt, want.session, want.raw) + if got.Label != want.label || got.SessionID != want.session || string(got.Response.StructuredOutput) != want.raw { + t.Fatalf("attempt[%d] = %#v, want %s/%s/%s", i, got, want.label, want.session, want.raw) } if got.DecodeError == nil || !strings.Contains(got.DecodeError.Error(), want.raw) { t.Fatalf("attempt[%d].DecodeError = %v, want raw payload in decode error", i, got.DecodeError) diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 66004ac6..bf302f18 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -1119,19 +1119,19 @@ func runReviewer(ctx context.Context, opts Options, req Request, runID string, p agentID := agent.ID taskID := reviewerTaskID(agent.ID) findings, session, ledgerSession, err := runStructuredTask(ctx, opts, llmTaskSpec{ - runID: runID, - taskID: taskID, - phase: "reviewer", - dependencyTaskIDs: dependencyTaskIDs, - inputFingerprint: llmTaskFingerprint(taskID, "reviewer", model, effort, prompt, dependencyTaskIDs), - artifacts: artifacts, - role: ledger.SessionRoleReviewer, - agentID: &agentID, - model: model, - effort: effort, - logPath: logPath, - prompt: prompt, - validationFailure: llmTaskStatusFailedIsolated, + runID: runID, + taskID: taskID, + phase: "reviewer", + dependencyTaskIDs: dependencyTaskIDs, + inputFingerprint: llmTaskFingerprint(taskID, "reviewer", model, effort, prompt, dependencyTaskIDs), + artifacts: artifacts, + role: ledger.SessionRoleReviewer, + agentID: &agentID, + model: model, + effort: effort, + logPath: logPath, + prompt: prompt, + validationFailureStatus: llmTaskStatusFailedIsolated, }, func(data []byte) (llm.Findings, error) { return llm.DecodeFindings(data, llm.FindingsOptions{ KnownAgents: map[string]bool{agent.ID: true}, @@ -1145,7 +1145,7 @@ func runReviewer(ctx context.Context, opts Options, req Request, runID string, p return nil, session, ledgerSession, &ReviewerFailure{ TaskID: taskID, AgentID: agent.ID, - Error: sanitizeTaskError(err), + Error: sanitizeTaskErrorForMarkdown(err), }, nil } return nil, sessionDraft{}, ledger.Session{}, nil, err @@ -1198,20 +1198,20 @@ func runStructuredResume[T any](ctx context.Context, opts Options, role ledger.S } type llmTaskSpec struct { - runID string - taskID string - phase string - dependencyTaskIDs []string - inputFingerprint string - artifacts ArtifactPaths - role ledger.SessionRole - agentID *string - model string - effort string - logPath string - prompt string - resumeSessionID string - validationFailure llmTaskStatus + runID string + taskID string + phase string + dependencyTaskIDs []string + inputFingerprint string + artifacts ArtifactPaths + role ledger.SessionRole + agentID *string + model string + effort string + logPath string + prompt string + resumeSessionID string + validationFailureStatus llmTaskStatus } func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpec, decode llm.Decoder[T]) (T, sessionDraft, ledger.Session, error) { @@ -1275,10 +1275,10 @@ func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpe return result.Value, draft, session, nil } - meta.Error = sanitizeTaskError(err) + meta.Error = sanitizeTaskErrorForMarkdown(err) meta.Status = llmTaskStatusFailedBlocking - if spec.validationFailure != "" && errors.Is(err, llm.ErrStructuredOutputInvalidAfterRetry) { - meta.Status = spec.validationFailure + if spec.validationFailureStatus != "" && errors.Is(err, llm.ErrStructuredOutputInvalidAfterRetry) { + meta.Status = spec.validationFailureStatus } meta.SessionRowID = session.SessionRowID meta.ProviderSessionID = session.ProviderSessionID @@ -1322,7 +1322,7 @@ func loadStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSp } return value, sessionDraftFromLedger(session), session, true, nil case llmTaskStatusFailedIsolated: - if spec.validationFailure != llmTaskStatusFailedIsolated { + if spec.validationFailureStatus != llmTaskStatusFailedIsolated { return zero, sessionDraft{}, ledger.Session{}, true, fmt.Errorf("pipeline: LLM task %q has isolated failure status outside reviewer phase", spec.taskID) } session, draft, err := loadOptionalTaskSession(ctx, opts, spec.runID, meta) @@ -1513,12 +1513,12 @@ func writeLLMTaskSuccess(paths ArtifactPaths, meta *llmTaskMetadata, output []by func writeLLMTaskFailure(paths ArtifactPaths, meta *llmTaskMetadata, attempts []llm.StructuredValidationAttempt) error { for _, attempt := range attempts { attemptMeta := llmTaskAttemptMetadata{ - Attempt: attempt.Attempt, + Attempt: attempt.Label, ProviderSessionID: attempt.SessionID, - DecodeError: sanitizeTaskError(attempt.DecodeError), + DecodeError: sanitizeTaskErrorForMarkdown(attempt.DecodeError), } if len(attempt.Response.StructuredOutput) > 0 { - rawPath, err := paths.LLMTaskRawAttempt(meta.TaskID, attempt.Attempt) + rawPath, err := paths.LLMTaskRawAttempt(meta.TaskID, attempt.Label) if err != nil { return err } @@ -1555,7 +1555,7 @@ func writeFileAtomic(path string, data []byte) error { return os.Rename(tmp, path) } -func sanitizeTaskError(err error) string { +func sanitizeTaskErrorForMarkdown(err error) string { if err == nil { return "" } @@ -1757,8 +1757,8 @@ func reviewerFailureSummaries(failures []ReviewerFailure) []reviewplan.ReviewerF out := make([]reviewplan.ReviewerFailureSummary, 0, len(failures)) for _, failure := range failures { out = append(out, reviewplan.ReviewerFailureSummary{ - Name: failure.AgentID, - Error: failure.Error, + AgentID: failure.AgentID, + Error: failure.Error, }) } return out diff --git a/internal/reviewplan/summary.go b/internal/reviewplan/summary.go index bd47b36a..910b5942 100644 --- a/internal/reviewplan/summary.go +++ b/internal/reviewplan/summary.go @@ -56,8 +56,8 @@ type RunSummary struct { // ReviewerFailureSummary is a reviewer task diagnostic rendered in the rollup. type ReviewerFailureSummary struct { - Name string - Error string + AgentID string + Error string } // WorkstreamUsage is adapter-reported usage for one workstream: the reserved @@ -232,7 +232,7 @@ func writeReviewerFailureDiagnostics(out *strings.Builder, failures []ReviewerFa out.WriteString("| Reviewer | Status | Diagnostic |\n") out.WriteString("|----------|--------|------------|\n") for _, failure := range failures { - fmt.Fprintf(out, "| %s | failed | %s |\n", escapeCell(failure.Name), escapeCell(failure.Error)) + fmt.Fprintf(out, "| %s | failed | %s |\n", escapeCell(failure.AgentID), escapeCell(failure.Error)) } out.WriteString("\n") } diff --git a/internal/reviewplan/summary_test.go b/internal/reviewplan/summary_test.go index e729413c..388c51ea 100644 --- a/internal/reviewplan/summary_test.go +++ b/internal/reviewplan/summary_test.go @@ -313,8 +313,8 @@ func TestRollupSummaryRendering(t *testing.T) { req.RunSummary = RunSummary{ SelectedReviewers: []string{"go:implementation-tests"}, ReviewerFailures: []ReviewerFailureSummary{{ - Name: "go:implementation-tests", - Error: "invalid ", + AgentID: "go:implementation-tests", + Error: "invalid ", }}, } plan, err := Build(req) From 6ee858ed33385a3fd6f954614121c7212d1871ae Mon Sep 17 00:00:00 2001 From: Rian Stockbower Date: Tue, 23 Jun 2026 21:29:00 -0400 Subject: [PATCH 3/7] docs: clarify LLM task attempt files --- docs/llm-task-artifacts.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/llm-task-artifacts.md b/docs/llm-task-artifacts.md index a9b9a3ed..7c0f806f 100644 --- a/docs/llm-task-artifacts.md +++ b/docs/llm-task-artifacts.md @@ -15,6 +15,10 @@ llm-tasks// retry.json ``` +Raw failed-attempt files are named `