Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ Within `internal/pipeline`, the public entry points are `DryRun`, `Live`, and
and no ledger or posting side effects so benchmark tooling can reuse the real
selector implementation.

Structured LLM calls in the review pipeline are durable per-task units. See
`docs/llm-task-artifacts.md` for the artifact schema, status taxonomy, and
resume invariants.

## Quick Commands

```bash
Expand Down
92 changes: 92 additions & 0 deletions docs/llm-task-artifacts.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# LLM Task Artifacts

`cr review` treats each structured LLM call as a durable task. Selection,
reviewer, and rollup calls must be isolated from each other so one failed task
does not erase successful upstream work or force unrelated LLM sessions to run
again.

Task artifacts live under a run artifact directory:

```text
llm-tasks/<encoded-task-id>/
metadata.json
Comment thread
monit-reviewer marked this conversation as resolved.
validated-output.json
initial.json
retry.json
```

Raw failed-attempt files are named `<label>.json` in the task directory. The
current structured adapter labels are `initial` and `retry`, which produce
`initial.json` and `retry.json` when raw invalid output is available.

`metadata.json` is the commit marker. Writers must publish it last, after any
validated output or raw failed-attempt payloads are written and after the ledger
session row exists when a provider session is available. Resume code must only
trust the final `metadata.json` name, never a temporary metadata file.

## Schema Version

`schema_version` is currently `1`. Bump it when changing any load-bearing field,
status value, fingerprint input, task identity, or resume rule in a way that
could make an in-flight run unsafe to resume.

Load-bearing metadata fields are:

- `task_id`: stable task identity. Current values are `orchestrator-selection`,
`reviewer-<encoded-agent-id>`, and `orchestrator-rollup`.
- `phase`: task phase, such as `selection`, `reviewer`, or `rollup`.
- `dependency_task_ids`: task IDs whose completed state was included in this
task input.
- `input_fingerprint`: hash of the task schema version, adapter, task identity,
phase, model/effort, prompt, and dependency task IDs.
- `agent_id`: reviewer agent ID for reviewer tasks.
- `status`: one of `succeeded`, `failed_isolated`, or `failed_blocking`.
- `session_row_id` and `provider_session_id`: ledger/provider session handles
used for run summaries and provider-level resume.
- `adapter`, `model`, `effort`, and `log_path`: execution context.
- `validated_output_path`: structured output to decode when reusing a succeeded
task.
- `error`: sanitized diagnostic for failed tasks.
- `attempts`: failed validation attempts with attempt label, provider session
ID, raw output path when present, and decode error.

## Status Semantics

`succeeded` means the task produced validated structured output. Resume may
reuse the output only when the metadata schema and input fingerprint still match
the current task.

`failed_isolated` is for reviewer-local LLM failures while the caller context is
still valid. This includes structured validation failures and provider failures
after a task provider session has started. Provider start failures with no
session are treated as blocking because they can indicate auth, quota, or other
systemic adapter problems. The failed reviewer is treated as
dependency-satisfied for downstream rollup, and the rollup receives a
diagnostic. Sibling reviewers continue to run. A review with any isolated
reviewer failure must not approve; the final event is clamped to at least
`comment`.

`failed_blocking` means the task prevents dependent phases from safely running.
Selection and rollup failures are blocking. Once a run exists, blocking LLM task
failures leave the run `incomplete` so the normal resume gate can rerun only the
failed task and downstream work.

Provider start/wait failures may have empty `attempts` because no structured
output existed. When a provider session ID is known, retry should seed the next
task call with that session if the adapter supports resume.

## Resume Rules

Resume starts at the first task that cannot be reused:

- Load a matching `succeeded` selection task instead of rerunning selection.
- Load matching `succeeded` reviewer tasks instead of rerunning reviewers.
- Load `failed_isolated` reviewer diagnostics instead of rerunning those
reviewers automatically.
- Rerun `failed_blocking` tasks and downstream phases.
- Fail with rerun guidance when metadata is missing required payloads, points to
a missing ledger session, has the wrong schema version, or has a stale input
fingerprint.

Raw invalid structured output is local artifact data. Public rollups may include
concise diagnostics, but they must not include raw failed model output.
62 changes: 55 additions & 7 deletions internal/llm/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,40 @@ type Decoder[T any] func([]byte) (T, error)

// StructuredResult contains the validated structured value and adapter metadata.
type StructuredResult[T any] struct {
Value T
Response Response
SessionID string
Value T
Response Response
SessionID string
ValidationAttempts []StructuredValidationAttempt
}

Comment thread
monit-reviewer marked this conversation as resolved.
// StructuredValidationAttempt records one failed schema-validation attempt.
type StructuredValidationAttempt struct {
Label string
SessionID string
Response Response
DecodeError error
}

// StructuredValidationError carries both invalid structured-output attempts
// when the validation correction retry also fails.
type StructuredValidationError struct {
Attempts []StructuredValidationAttempt
}

func (e *StructuredValidationError) Error() string {
first, second := "unknown", "unknown"
if len(e.Attempts) > 0 && e.Attempts[0].DecodeError != nil {
first = e.Attempts[0].DecodeError.Error()
}
if len(e.Attempts) > 1 && e.Attempts[1].DecodeError != nil {
second = e.Attempts[1].DecodeError.Error()
}
return fmt.Sprintf("%s: first: %s; second: %s", ErrStructuredOutputInvalidAfterRetry, first, second)
}

// Is matches ErrStructuredOutputInvalidAfterRetry for errors.Is callers.
func (e *StructuredValidationError) Is(target error) bool {
return target == ErrStructuredOutputInvalidAfterRetry
}

// RunStructured runs a structured-output request and retries one validation
Expand Down Expand Up @@ -102,12 +133,18 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter,
var zero T
sessionID, response, err := runOnceWithSession(ctx, adapter, resumeSessionID, req)
if err != nil {
return StructuredResult[T]{Response: response}, err
return StructuredResult[T]{Response: response, SessionID: sessionID}, err
}
value, decodeErr := decodeStructured(decode, response.StructuredOutput)
if decodeErr == nil {
return StructuredResult[T]{Value: value, Response: response, SessionID: sessionID}, nil
}
attempts := []StructuredValidationAttempt{{
Label: "initial",
SessionID: sessionID,
Response: cloneResponse(response),
DecodeError: decodeErr,
}}

retryReq := req
retryReq.Prompt = retryPrompt(req.Prompt, decodeErr)
Expand All @@ -117,13 +154,19 @@ func RunStructuredWithSessionResume[T any](ctx context.Context, adapter Adapter,
}
retrySessionID, retryResponse, err := runOnceWithSession(ctx, adapter, retryResumeSessionID, retryReq)
if err != nil {
return StructuredResult[T]{Response: retryResponse}, err
return StructuredResult[T]{Response: retryResponse, SessionID: retrySessionID, ValidationAttempts: attempts}, err
}
retryValue, retryErr := decodeStructured(decode, retryResponse.StructuredOutput)
if retryErr != nil {
return StructuredResult[T]{Value: zero, Response: retryResponse, SessionID: retrySessionID}, fmt.Errorf("%w: first: %w; second: %w", ErrStructuredOutputInvalidAfterRetry, decodeErr, retryErr)
attempts = append(attempts, StructuredValidationAttempt{
Label: "retry",
SessionID: retrySessionID,
Response: cloneResponse(retryResponse),
DecodeError: retryErr,
})
return StructuredResult[T]{Value: zero, Response: retryResponse, SessionID: retrySessionID, ValidationAttempts: attempts}, &StructuredValidationError{Attempts: attempts}
}
return StructuredResult[T]{Value: retryValue, Response: retryResponse, SessionID: retrySessionID}, nil
return StructuredResult[T]{Value: retryValue, Response: retryResponse, SessionID: retrySessionID, ValidationAttempts: attempts}, nil
}

// decodeStructured strict-decodes data, then on failure recovers a response
Expand Down Expand Up @@ -185,6 +228,11 @@ func runOnceAttempt(ctx context.Context, adapter Adapter, resumeSessionID string
return stream.SessionID(), response, err
}

func cloneResponse(response Response) Response {
response.StructuredOutput = append([]byte(nil), response.StructuredOutput...)
return response
}

func retryPrompt(prompt string, err error) string {
return prompt + "\n\nThe previous structured output failed validation: " + validationErrorSummary(err) + "\nReturn corrected JSON only. Do not wrap the JSON in markdown fences, add prose, or include any leading or trailing text."
}
Expand Down
79 changes: 79 additions & 0 deletions internal/llm/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,85 @@ func TestRunStructuredWithSessionResume(t *testing.T) {
t.Fatalf("retry prompt = %q, want validation suffix", resumes[1].Request.Prompt)
}
})

t.Run("wait error preserves started session", func(t *testing.T) {
waitErr := errors.New("wait failed")
adapter := &FakeAdapter{}
adapter.Queue(FakeResult{SessionID: "started-session", WaitErr: waitErr})

result, err := RunStructuredWithSessionResume(context.Background(), adapter, "", Request{Prompt: "prompt"}, func(_ []byte) (string, error) {
return "unused", nil
})
if !errors.Is(err, waitErr) {
t.Fatalf("RunStructuredWithSessionResume error = %v, want %v", err, waitErr)
}
if result.SessionID != "started-session" {
t.Fatalf("result.SessionID = %q, want started-session", result.SessionID)
}
})
}

func TestRunStructuredValidationAttempts(t *testing.T) {
adapter := &FakeAdapter{SupportsResumeValue: true}
adapter.Queue(FakeResult{SessionID: "initial-session", Response: Response{
StructuredOutput: []byte(`{"bad":"initial"}`),
Usage: Usage{TokensIn: intPtr(11)},
}})
adapter.Queue(FakeResult{SessionID: "retry-session", Response: Response{
StructuredOutput: []byte(`{"bad":"retry"}`),
Usage: Usage{TokensOut: intPtr(17)},
}})

result, err := RunStructuredWithSessionResume(context.Background(), adapter, "stored-session", Request{Prompt: "prompt"}, func(data []byte) (string, error) {
return "", fmt.Errorf("decode failed for %s", data)
})
if err == nil {
t.Fatal("RunStructuredWithSessionResume error = nil, want validation error")
}
if !errors.Is(err, ErrStructuredOutputInvalidAfterRetry) {
t.Fatalf("error = %v, want %v", err, ErrStructuredOutputInvalidAfterRetry)
}
var validationErr *StructuredValidationError
if !errors.As(err, &validationErr) {
t.Fatalf("error type = %T, want StructuredValidationError", err)
}
if result.SessionID != "retry-session" {
t.Fatalf("result.SessionID = %q, want retry-session", result.SessionID)
}
assertValidationAttempts(t, result.ValidationAttempts)
assertValidationAttempts(t, validationErr.Attempts)

resumes := adapter.Resumes()
if len(resumes) != 2 {
t.Fatalf("resumes = %#v, want initial and retry resumes", resumes)
}
if resumes[0].SessionID != "stored-session" || resumes[1].SessionID != "initial-session" {
t.Fatalf("resume sessions = %#v, want stored-session then initial-session", resumes)
}
}

func assertValidationAttempts(t *testing.T, attempts []StructuredValidationAttempt) {
t.Helper()
if len(attempts) != 2 {
t.Fatalf("attempts = %#v, want two attempts", attempts)
}
want := []struct {
label string
session string
raw string
}{
{label: "initial", session: "initial-session", raw: `{"bad":"initial"}`},
{label: "retry", session: "retry-session", raw: `{"bad":"retry"}`},
}
for i, want := range want {
got := attempts[i]
if got.Label != want.label || got.SessionID != want.session || string(got.Response.StructuredOutput) != want.raw {
t.Fatalf("attempt[%d] = %#v, want %s/%s/%s", i, got, want.label, want.session, want.raw)
}
if got.DecodeError == nil || !strings.Contains(got.DecodeError.Error(), want.raw) {
t.Fatalf("attempt[%d].DecodeError = %v, want raw payload in decode error", i, got.DecodeError)
}
}
}

func TestRunStructuredProseRecovery(t *testing.T) {
Expand Down
Loading
Loading