diff --git a/internal/cmd/reviewcmd/adapter_progress.go b/internal/cmd/reviewcmd/adapter_progress.go new file mode 100644 index 0000000..5542d0a --- /dev/null +++ b/internal/cmd/reviewcmd/adapter_progress.go @@ -0,0 +1,122 @@ +package reviewcmd + +import ( + "context" + "path/filepath" + "strconv" + "strings" + + "github.com/open-cli-collective/codereview-cli/internal/llm" + "github.com/open-cli-collective/codereview-cli/internal/progress" +) + +type progressAdapter struct { + adapter llm.Adapter + logger *progress.Logger + provider string + harness string +} + +type progressStream struct { + stream llm.Stream + span *progress.Span +} + +func withProgressAdapter(logger *progress.Logger, adapter llm.Adapter, provider, harness string) llm.Adapter { + if adapter == nil || logger == nil { + return adapter + } + return progressAdapter{ + adapter: adapter, + logger: logger, + provider: strings.TrimSpace(provider), + harness: strings.TrimSpace(harness), + } +} + +func (a progressAdapter) Name() string { return a.adapter.Name() } + +func (a progressAdapter) SupportsResume() bool { return a.adapter.SupportsResume() } + +func (a progressAdapter) SupportsCacheAccounting() bool { return a.adapter.SupportsCacheAccounting() } + +func (a progressAdapter) SupportsCostReporting() bool { return a.adapter.SupportsCostReporting() } + +func (a progressAdapter) Quota(ctx context.Context) (llm.Quota, bool, error) { + return a.adapter.Quota(ctx) +} + +func (a progressAdapter) Start(ctx context.Context, req llm.Request) (llm.Stream, error) { + return a.start(ctx, "start_llm", req, "") +} + +func (a progressAdapter) Resume(ctx context.Context, sessionID string, req llm.Request) (llm.Stream, error) { + return a.start(ctx, "resume_llm", req, sessionID) +} + +func (a progressAdapter) start(ctx context.Context, op string, req llm.Request, resumeSessionID string) (llm.Stream, error) { + fields := llmProgressFields(a.provider, a.harness, req, resumeSessionID) + span := a.logger.StartFields("review", op, "llm", fields...) + var ( + stream llm.Stream + err error + ) + if strings.TrimSpace(resumeSessionID) != "" { + stream, err = a.adapter.Resume(ctx, resumeSessionID, req) + } else { + stream, err = a.adapter.Start(ctx, req) + } + if err != nil { + span.End(err) + return nil, err + } + return progressStream{stream: stream, span: span}, nil +} + +func (s progressStream) SessionID() string { + return s.stream.SessionID() +} + +func (s progressStream) Wait(ctx context.Context) (llm.Response, error) { + resp, err := s.stream.Wait(ctx) + fields := llmResponseFields(resp) + s.span.EndFields(err, fields...) + return resp, err +} + +func llmProgressFields(provider, harness string, req llm.Request, _ string) []progress.Field { + fields := []progress.Field{} + if provider = strings.TrimSpace(provider); provider != "" { + fields = append(fields, progress.Field{Key: "provider", Value: provider}) + } + if harness = strings.TrimSpace(harness); harness != "" { + fields = append(fields, progress.Field{Key: "harness", Value: harness}) + } + if model := strings.TrimSpace(req.Model); model != "" { + fields = append(fields, progress.Field{Key: "model", Value: model}) + } + if effort := strings.TrimSpace(req.Effort); effort != "" { + fields = append(fields, progress.Field{Key: "effort", Value: effort}) + } + if logPath := strings.TrimSpace(req.LogPath); logPath != "" { + fields = append(fields, progress.Field{Key: "log_file", Value: filepath.Base(logPath)}) + } + return fields +} + +func llmResponseFields(resp llm.Response) []progress.Field { + fields := []progress.Field{} + if resp.Usage.TokensIn != nil { + fields = append(fields, progress.Field{Key: "tokens_in", Value: strconv.Itoa(*resp.Usage.TokensIn)}) + } + if resp.Usage.TokensOut != nil { + fields = append(fields, progress.Field{Key: "tokens_out", Value: strconv.Itoa(*resp.Usage.TokensOut)}) + } + if resp.Usage.CacheRead != nil { + fields = append(fields, progress.Field{Key: "cache_read", Value: strconv.Itoa(*resp.Usage.CacheRead)}) + } + if resp.Usage.CacheCreate != nil { + fields = append(fields, progress.Field{Key: "cache_create", Value: strconv.Itoa(*resp.Usage.CacheCreate)}) + } + return fields +} diff --git a/internal/cmd/reviewcmd/live_progress.go b/internal/cmd/reviewcmd/live_progress.go new file mode 100644 index 0000000..32927ac --- /dev/null +++ b/internal/cmd/reviewcmd/live_progress.go @@ -0,0 +1,61 @@ +package reviewcmd + +import ( + "context" + "strconv" + + "github.com/open-cli-collective/codereview-cli/internal/approvaloverride" + "github.com/open-cli-collective/codereview-cli/internal/ledger" + "github.com/open-cli-collective/codereview-cli/internal/pipeline" + "github.com/open-cli-collective/codereview-cli/internal/progress" + "github.com/open-cli-collective/codereview-cli/internal/reviewrun" +) + +type progressPlanner struct { + planner reviewrun.Planner + logger *progress.Logger +} + +type progressApprovalOverrideClassifier struct { + classifier approvaloverride.Classifier + logger *progress.Logger +} + +func withProgressPlanner(logger *progress.Logger, planner reviewrun.Planner) reviewrun.Planner { + if planner == nil || logger == nil { + return planner + } + return progressPlanner{planner: planner, logger: logger} +} + +func withProgressApprovalOverrideClassifier(logger *progress.Logger, classifier approvaloverride.Classifier) approvaloverride.Classifier { + if classifier == nil || logger == nil { + return classifier + } + return progressApprovalOverrideClassifier{classifier: classifier, logger: logger} +} + +func (p progressPlanner) Live(ctx context.Context, req pipeline.Request, run ledger.Run) (pipeline.Result, error) { + span := p.logger.StartFields("review", "plan_live_review", "pr", progress.Field{Key: "run_id", Value: run.RunID}) + result, err := p.planner.Live(ctx, req, run) + return result, endProgressSpanFields(span, err, progress.Field{Key: "run_id", Value: run.RunID}) +} + +func (c progressApprovalOverrideClassifier) ClassifyApprovalOverride(ctx context.Context, req approvaloverride.Request) (approvaloverride.Result, error) { + fields := []progress.Field{{Key: "candidate_count", Value: strconv.Itoa(len(req.Candidates))}} + span := c.logger.StartFields("review", "classify_approval_override", "pr", fields...) + result, err := c.classifier.ClassifyApprovalOverride(ctx, req) + return result, endProgressSpanFieldsResult(span, err, result, fields...) +} + +func endProgressSpanFields(span *progress.Span, err error, fields ...progress.Field) error { + if span != nil { + span.EndFields(err, fields...) + } + return err +} + +func endProgressSpanFieldsResult(span *progress.Span, err error, result approvaloverride.Result, fields ...progress.Field) error { + fields = append(fields, progress.Field{Key: "approve", Value: strconv.FormatBool(result.Approve)}) + return endProgressSpanFields(span, err, fields...) +} diff --git a/internal/cmd/reviewcmd/pipeline_progress.go b/internal/cmd/reviewcmd/pipeline_progress.go new file mode 100644 index 0000000..ece1b77 --- /dev/null +++ b/internal/cmd/reviewcmd/pipeline_progress.go @@ -0,0 +1,81 @@ +package reviewcmd + +import ( + "path/filepath" + "strconv" + "strings" + + "github.com/open-cli-collective/codereview-cli/internal/pipeline" + "github.com/open-cli-collective/codereview-cli/internal/progress" +) + +type pipelineTaskProgress struct { + logger *progress.Logger +} + +type pipelineTaskSpan struct { + span *progress.Span +} + +func newPipelineTaskProgress(logger *progress.Logger) pipeline.LLMTaskProgress { + if logger == nil { + return nil + } + return pipelineTaskProgress{logger: logger} +} + +func (p pipelineTaskProgress) StartLLMTask(event pipeline.LLMTaskProgressEvent) pipeline.LLMTaskProgressSpan { + return pipelineTaskSpan{ + span: p.logger.StartFields("review", "run_llm_task", "llm_task", pipelineTaskProgressFields(event)...), + } +} + +func (p pipelineTaskProgress) LoadLLMTask(event pipeline.LLMTaskProgressEvent, result pipeline.LLMTaskProgressResult) { + span := p.logger.StartFields("review", "load_llm_task", "llm_task", pipelineTaskProgressFields(event)...) + span.EndFields(nil, pipelineTaskProgressResultFields(result)...) +} + +func (s pipelineTaskSpan) End(err error, result pipeline.LLMTaskProgressResult) { + if s.span == nil { + return + } + s.span.EndFields(err, pipelineTaskProgressResultFields(result)...) +} + +func pipelineTaskProgressFields(event pipeline.LLMTaskProgressEvent) []progress.Field { + fields := []progress.Field{ + {Key: "task_id", Value: event.TaskID}, + {Key: "phase", Value: event.Phase}, + {Key: "source", Value: event.Source}, + } + if agentID := strings.TrimSpace(event.AgentID); agentID != "" { + fields = append(fields, progress.Field{Key: "agent_id", Value: agentID}) + } + if model := strings.TrimSpace(event.Model); model != "" { + fields = append(fields, progress.Field{Key: "model", Value: model}) + } + if effort := strings.TrimSpace(event.Effort); effort != "" { + fields = append(fields, progress.Field{Key: "effort", Value: effort}) + } + if logPath := strings.TrimSpace(event.LogPath); logPath != "" { + fields = append(fields, progress.Field{Key: "log_file", Value: filepath.Base(logPath)}) + } + if resumeSessionID := strings.TrimSpace(event.ResumeSessionID); resumeSessionID != "" { + fields = append(fields, progress.Field{Key: "resume_session_id", Value: resumeSessionID}) + } + return fields +} + +func pipelineTaskProgressResultFields(result pipeline.LLMTaskProgressResult) []progress.Field { + fields := []progress.Field{ + {Key: "cached", Value: strconv.FormatBool(result.Cached)}, + {Key: "task_status", Value: result.Status}, + } + if sessionID := strings.TrimSpace(result.ProviderSessionID); sessionID != "" { + fields = append(fields, progress.Field{Key: "session_id", Value: sessionID}) + } + if result.ValidationAttempts > 0 { + fields = append(fields, progress.Field{Key: "validation_attempts", Value: strconv.Itoa(result.ValidationAttempts)}) + } + return fields +} diff --git a/internal/cmd/reviewcmd/progress.go b/internal/cmd/reviewcmd/progress.go new file mode 100644 index 0000000..5a62d1a --- /dev/null +++ b/internal/cmd/reviewcmd/progress.go @@ -0,0 +1,20 @@ +package reviewcmd + +import ( + "github.com/open-cli-collective/codereview-cli/internal/cmd/root" + "github.com/open-cli-collective/codereview-cli/internal/progress" +) + +func newProgressLogger(opts *root.Options) *progress.Logger { + if opts == nil { + return progress.New(nil, true, nil) + } + return progress.New(opts.Stderr, opts.Quiet, nil) +} + +func endProgressSpan(span *progress.Span, err error) error { + if span != nil { + span.End(err) + } + return err +} diff --git a/internal/cmd/reviewcmd/provider_progress.go b/internal/cmd/reviewcmd/provider_progress.go new file mode 100644 index 0000000..7289673 --- /dev/null +++ b/internal/cmd/reviewcmd/provider_progress.go @@ -0,0 +1,140 @@ +package reviewcmd + +import ( + "context" + pathpkg "path" + "strings" + "unicode" + + "github.com/open-cli-collective/codereview-cli/internal/gitprovider" + "github.com/open-cli-collective/codereview-cli/internal/progress" +) + +type progressProvider struct { + provider gitprovider.GitProvider + logger *progress.Logger +} + +type progressRangeProvider struct { + progressProvider +} + +func withProgressProvider(logger *progress.Logger, provider gitprovider.GitProvider) gitprovider.GitProvider { + if provider == nil { + return nil + } + if logger == nil { + return provider + } + wrapped := progressProvider{provider: provider, logger: logger} + if _, ok := provider.(interface { + GetDiffBetweenRefs(context.Context, gitprovider.PRRef, string, string) (gitprovider.UnifiedDiff, error) + }); ok { + return progressRangeProvider{progressProvider: wrapped} + } + return wrapped +} + +func (p progressProvider) WhoAmI(ctx context.Context, creds gitprovider.Credential) (gitprovider.Identity, error) { + span := p.logger.Start("review", "resolve_identity", "runtime") + identity, err := p.provider.WhoAmI(ctx, creds) + return identity, endProgressSpan(span, err) +} + +func (p progressProvider) GetPR(ctx context.Context, ref gitprovider.PRRef) (gitprovider.PR, error) { + span := p.logger.Start("review", "fetch_pr", "pr") + pr, err := p.provider.GetPR(ctx, ref) + return pr, endProgressSpan(span, err) +} + +func (p progressProvider) GetDiff(ctx context.Context, ref gitprovider.PRRef) (gitprovider.UnifiedDiff, error) { + span := p.logger.Start("review", "fetch_diff", "pr") + diff, err := p.provider.GetDiff(ctx, ref) + return diff, endProgressSpan(span, err) +} + +func (p progressRangeProvider) GetDiffBetweenRefs(ctx context.Context, ref gitprovider.PRRef, baseSHA, headSHA string) (gitprovider.UnifiedDiff, error) { + rangeProvider := p.provider.(interface { + GetDiffBetweenRefs(context.Context, gitprovider.PRRef, string, string) (gitprovider.UnifiedDiff, error) + }) + span := p.logger.Start("review", "fetch_diff_between_refs", "pr") + diff, err := rangeProvider.GetDiffBetweenRefs(ctx, ref, baseSHA, headSHA) + return diff, endProgressSpan(span, err) +} + +func (p progressProvider) GetFileAtRef(ctx context.Context, ref gitprovider.PRRef, gitRef, path string) ([]byte, error) { + span := p.logger.Start("review", "read_file", fileTarget(path)) + data, err := p.provider.GetFileAtRef(ctx, ref, gitRef, path) + return data, endProgressSpan(span, err) +} + +func (p progressProvider) ListTreeAtRef(ctx context.Context, ref gitprovider.PRRef, gitRef, path string) ([]gitprovider.TreeEntry, error) { + span := p.logger.Start("review", "list_tree", fileTarget(path)) + entries, err := p.provider.ListTreeAtRef(ctx, ref, gitRef, path) + return entries, endProgressSpan(span, err) +} + +func (p progressProvider) ListInlineThreads(ctx context.Context, ref gitprovider.PRRef) ([]gitprovider.InlineThread, error) { + span := p.logger.Start("review", "list_threads", "threads") + threads, err := p.provider.ListInlineThreads(ctx, ref) + return threads, endProgressSpan(span, err) +} + +func (p progressProvider) ListReviews(ctx context.Context, ref gitprovider.PRRef) ([]gitprovider.Review, error) { + span := p.logger.Start("review", "list_reviews", "reviews") + reviews, err := p.provider.ListReviews(ctx, ref) + return reviews, endProgressSpan(span, err) +} + +func (p progressProvider) ListIssueComments(ctx context.Context, ref gitprovider.PRRef) ([]gitprovider.IssueComment, error) { + span := p.logger.Start("review", "list_issue_comments", "posts") + comments, err := p.provider.ListIssueComments(ctx, ref) + return comments, endProgressSpan(span, err) +} + +func (p progressProvider) PostInlineComment(ctx context.Context, ref gitprovider.PRRef, c gitprovider.InlineComment) (gitprovider.CommentID, error) { + span := p.logger.Start("review", "post_inline_comment", "posts") + id, err := p.provider.PostInlineComment(ctx, ref, c) + return id, endProgressSpan(span, err) +} + +func (p progressProvider) ReplyToThread(ctx context.Context, ref gitprovider.PRRef, threadID gitprovider.ThreadID, body string) (gitprovider.CommentID, error) { + span := p.logger.Start("review", "reply_thread", "posts") + id, err := p.provider.ReplyToThread(ctx, ref, threadID, body) + return id, endProgressSpan(span, err) +} + +func (p progressProvider) ResolveThread(ctx context.Context, ref gitprovider.PRRef, threadID gitprovider.ThreadID) error { + span := p.logger.Start("review", "resolve_thread", "posts") + return endProgressSpan(span, p.provider.ResolveThread(ctx, ref, threadID)) +} + +func (p progressProvider) PostIssueComment(ctx context.Context, ref gitprovider.PRRef, body string) (gitprovider.CommentID, error) { + span := p.logger.Start("review", "post_issue_comment", "posts") + id, err := p.provider.PostIssueComment(ctx, ref, body) + return id, endProgressSpan(span, err) +} + +func (p progressProvider) SubmitReview(ctx context.Context, ref gitprovider.PRRef, r gitprovider.ReviewRequest) (gitprovider.ReviewID, error) { + span := p.logger.Start("review", "submit_review", "posts") + id, err := p.provider.SubmitReview(ctx, ref, r) + return id, endProgressSpan(span, err) +} + +func (p progressProvider) Capabilities() gitprovider.ProviderCaps { + return p.provider.Capabilities() +} + +func fileTarget(pathValue string) string { + pathValue = strings.TrimSpace(pathValue) + if pathValue == "" { + return "file" + } + sanitized := strings.Map(func(r rune) rune { + if unicode.IsControl(r) { + return '_' + } + return r + }, pathValue) + return "file:" + pathpkg.Clean(sanitized) +} diff --git a/internal/cmd/reviewcmd/reviewcmd.go b/internal/cmd/reviewcmd/reviewcmd.go index 9e68eb3..ce238db 100644 --- a/internal/cmd/reviewcmd/reviewcmd.go +++ b/internal/cmd/reviewcmd/reviewcmd.go @@ -32,6 +32,7 @@ import ( "github.com/open-cli-collective/codereview-cli/internal/modelprefs" "github.com/open-cli-collective/codereview-cli/internal/outbox" "github.com/open-cli-collective/codereview-cli/internal/pipeline" + "github.com/open-cli-collective/codereview-cli/internal/progress" "github.com/open-cli-collective/codereview-cli/internal/prref" "github.com/open-cli-collective/codereview-cli/internal/review" "github.com/open-cli-collective/codereview-cli/internal/reviewplan" @@ -268,29 +269,36 @@ func runReview(ctx context.Context, cmd *cobra.Command, opts *root.Options, fact failOn = &threshold } + logger := newProgressLogger(opts) + configSpan := logger.Start("review", "load_config", "config") path, err := configPath(opts) if err != nil { - return exitcode.AuthConfig(err) + return exitcode.AuthConfig(endProgressSpan(configSpan, err)) } cfg, err := config.Load(path) if err != nil { - return cmderr.Config(err) + return cmderr.Config(endProgressSpan(configSpan, err)) } + configSpan.End(nil) + parseSpan := logger.Start("review", "parse_pr", "pr") ref, err := prref.ParseGitHubPullURL(prArg) if err != nil { - return exitcode.Usage(err) + return exitcode.Usage(endProgressSpan(parseSpan, err)) } + parseSpan.End(nil) + profileSpan := logger.Start("review", "resolve_profile", "profile") profileName, profile, err := config.ResolveProfileForRepository(cfg, opts.Profile, root.ProfileFlagChanged(cmd), config.RepositoryTarget{ Host: ref.Host, Namespace: ref.Owner, Repo: ref.Repo, }) if err != nil { - return cmderr.Config(err) + return cmderr.Config(endProgressSpan(profileSpan, err)) } if !prref.SameHost(ref.Host, profile.Git.Host) { - return exitcode.Usage(fmt.Errorf("PR host %q must match configured git host %q", ref.Host, profile.Git.Host)) + return exitcode.Usage(endProgressSpan(profileSpan, fmt.Errorf("PR host %q must match configured git host %q", ref.Host, profile.Git.Host))) } + profileSpan.End(nil) runtimeOpts := RuntimeOptions{ MaxAgents: flags.maxAgents, @@ -299,10 +307,12 @@ func runReview(ctx context.Context, cmd *cobra.Command, opts *root.Options, fact Retention: retentionPolicyFromConfig(cfg.Data.Retention), RetentionManualOnly: cfg.Data.Retention.Enforcement == config.RetentionManualOnly, } + runtimeSpan := logger.Start("review", "build_runtime", "runtime") runtime, err := factory(cmd, opts, cfg, profile, runtimeOpts) if err != nil { - return err + return endProgressSpan(runtimeSpan, err) } + runtimeSpan.End(nil) if runtime.Cleanup != nil { defer runtime.Cleanup() } @@ -335,16 +345,19 @@ func runReview(ctx context.Context, cmd *cobra.Command, opts *root.Options, fact ToolVersion: version.Version, } if !flags.dryRun { - return runLive(ctx, opts, flags, runtime.Runner, pipelineReq, failOn) + return runLive(ctx, logger, opts, flags, runtime.Runner, pipelineReq, failOn) } + execSpan := logger.Start("review", "execute_dry_run", "pr") result, err := runtime.Runner.DryRun(ctx, pipelineReq) if err != nil { - return mapRunError(err) + return endProgressSpan(execSpan, mapRunError(err)) } + execSpan.End(nil) + renderSpan := logger.Start("review", "render_result", "stdout") rendered, err := newReviewDryRun(result) if err != nil { - return err + return endProgressSpan(renderSpan, err) } if flags.jsonOutput { err = view.RenderReviewDryRunJSON(opts.Stdout, rendered) @@ -352,8 +365,9 @@ func runReview(ctx context.Context, cmd *cobra.Command, opts *root.Options, fact err = view.RenderReviewDryRunText(opts.Stdout, rendered) } if err != nil { - return err + return endProgressSpan(renderSpan, err) } + renderSpan.End(nil) if result.FailOnTriggered && failOn != nil { return exitcode.With(exitcode.Failure, fmt.Errorf("findings at or above --fail-on %s", failOn.String())) } @@ -403,11 +417,14 @@ func validateReviewSHAFlag(name, sha string) error { return nil } -func runLive(ctx context.Context, opts *root.Options, flags commandFlags, runner Runner, req pipeline.Request, failOn *review.Severity) error { +func runLive(ctx context.Context, logger *progress.Logger, opts *root.Options, flags commandFlags, runner Runner, req pipeline.Request, failOn *review.Severity) error { + execSpan := logger.Start("review", "execute_live", "pr") result, err := runner.Live(ctx, req, reviewrun.Flags{Rerun: flags.rerun, RetryPosts: flags.retryPosts}) if err != nil { - return mapRunError(err) + return endProgressSpan(execSpan, mapRunError(err)) } + execSpan.End(nil) + renderSpan := logger.Start("review", "render_result", "stdout") rendered := newReviewLive(result) if flags.jsonOutput { err = view.RenderReviewLiveJSON(opts.Stdout, rendered) @@ -415,8 +432,9 @@ func runLive(ctx context.Context, opts *root.Options, flags commandFlags, runner err = view.RenderReviewLiveText(opts.Stdout, rendered) } if err != nil { - return err + return endProgressSpan(renderSpan, err) } + renderSpan.End(nil) if result.ExitCode != exitcode.Success { return exitcode.With(result.ExitCode, liveResultError(result)) } @@ -685,6 +703,7 @@ func newRuntime(cmd *cobra.Command, opts *root.Options, cfg config.File, profile profile = normalizeRuntimeProfile(profile) stores := newRuntimeCredentialStores(cfg, opts.Backend, cmderr.BackendFlagChanged(cmd)) cleanup := stores.Close + logger := newProgressLogger(opts) providerGit := gitConfigForReviewerAuth(profile) providerStore, err := stores.Open(providerGit.Credential) if err != nil { @@ -696,6 +715,7 @@ func newRuntime(cmd *cobra.Command, opts *root.Options, cfg config.File, profile cleanup() return Runtime{}, mapRunError(err) } + provider = withProgressProvider(logger, provider) postingIdentity, err := resolvePostingIdentityForRuntime(cmd.Context(), provider, credential, providerStore, profile) if err != nil { cleanup() @@ -733,9 +753,9 @@ func newRuntime(cmd *cobra.Command, opts *root.Options, cfg config.File, profile if err != nil { return nil, err } - return adapter, nil + return withProgressAdapter(logger, adapter, string(profile.LLM.Provider), string(profile.LLM.Adapter)), nil }) - runner := buildReviewRunner(ledgerStore, provider, adapter, profile, limiter, layout, opts.Stderr, runtimeOpts) + runner := buildReviewRunner(ledgerStore, provider, adapter, profile, limiter, layout, opts.Stderr, logger, runtimeOpts) return Runtime{ Runner: runner, PostingIdentity: postingIdentity, @@ -836,7 +856,7 @@ func runtimeLayout() (statepaths.Layout, error) { return layout, nil } -func buildReviewRunner(ledgerStore *ledger.Store, provider gitprovider.GitProvider, adapter llm.Adapter, profile config.Profile, limiter outbox.Limiter, layout statepaths.Layout, warnings io.Writer, runtimeOpts RuntimeOptions) reviewRunner { +func buildReviewRunner(ledgerStore *ledger.Store, provider gitprovider.GitProvider, adapter llm.Adapter, profile config.Profile, limiter outbox.Limiter, layout statepaths.Layout, warnings io.Writer, logger *progress.Logger, runtimeOpts RuntimeOptions) reviewRunner { pipelineOpts := pipeline.Options{ Provider: provider, Adapter: adapter, @@ -844,6 +864,7 @@ func buildReviewRunner(ledgerStore *ledger.Store, provider gitprovider.GitProvid NamedSessions: ledgerStore, Layout: layout, Warnings: warnings, + TaskProgress: newPipelineTaskProgress(logger), MaxAgents: runtimeOpts.MaxAgents, MaxConcurrency: runtimeOpts.MaxConcurrency, Retention: runtimeOpts.Retention, @@ -854,12 +875,12 @@ func buildReviewRunner(ledgerStore *ledger.Store, provider gitprovider.GitProvid live: reviewrun.Options{ Store: ledgerStore, Provider: provider, - Planner: livePlanner{opts: pipelineOpts}, + Planner: withProgressPlanner(logger, livePlanner{opts: pipelineOpts}), Limiter: limiter, Layout: layout, StaleHeartbeatThreshold: 10 * time.Minute, Warnings: warnings, - ApprovalOverride: buildApprovalOverrideClassifier(profile, adapter, warnings), + ApprovalOverride: withProgressApprovalOverrideClassifier(logger, buildApprovalOverrideClassifier(profile, adapter, warnings)), Retention: runtimeOpts.Retention, RetentionManualOnly: runtimeOpts.RetentionManualOnly, }, diff --git a/internal/cmd/reviewcmd/reviewcmd_test.go b/internal/cmd/reviewcmd/reviewcmd_test.go index 2118b70..b524e20 100644 --- a/internal/cmd/reviewcmd/reviewcmd_test.go +++ b/internal/cmd/reviewcmd/reviewcmd_test.go @@ -32,6 +32,7 @@ import ( "github.com/open-cli-collective/codereview-cli/internal/llm" "github.com/open-cli-collective/codereview-cli/internal/outbox" "github.com/open-cli-collective/codereview-cli/internal/pipeline" + "github.com/open-cli-collective/codereview-cli/internal/progress" "github.com/open-cli-collective/codereview-cli/internal/review" "github.com/open-cli-collective/codereview-cli/internal/reviewplan" "github.com/open-cli-collective/codereview-cli/internal/reviewrun" @@ -403,6 +404,17 @@ func TestNewRuntimeCreatesCodexCLIWithoutOpenAIAPIKey(t *testing.T) { if runner.pipeline.Adapter == nil || runner.pipeline.Adapter.Name() != "codex_cli" { t.Fatalf("pipeline adapter = %#v, want codex_cli", runner.pipeline.Adapter) } + loadedAdapter, err := runner.pipeline.Adapter.(*lazyAdapter).get() + if err != nil { + t.Fatalf("lazy adapter get: %v", err) + } + progressAdapter, ok := loadedAdapter.(progressAdapter) + if !ok || progressAdapter.adapter == nil || progressAdapter.adapter.Name() != "codex_cli" { + t.Fatalf("loaded pipeline adapter = %#v, want wrapped codex_cli adapter", loadedAdapter) + } + if runner.pipeline.TaskProgress == nil { + t.Fatal("pipeline TaskProgress = nil, want review progress wiring") + } } func TestNewRuntimeUsesReviewerCredentialsAsRuntimeProvider(t *testing.T) { @@ -426,8 +438,9 @@ func TestNewRuntimeUsesReviewerCredentialsAsRuntimeProvider(t *testing.T) { return reviewerProvider, gitprovider.Credential{Type: "pat", Token: "reviewer-token"}, nil }, func(_ context.Context, provider gitprovider.GitProvider, credential gitprovider.Credential, _ githubprovider.TokenStore, _ config.Profile) (gitprovider.Identity, error) { - if provider != reviewerProvider || credential.Token != "reviewer-token" { - t.Fatalf("identity resolver got provider=%p credential=%#v, want reviewer provider/token", provider, credential) + wrapped, ok := provider.(progressProvider) + if !ok || wrapped.provider != reviewerProvider || credential.Token != "reviewer-token" { + t.Fatalf("identity resolver got provider=%T %#v credential=%#v, want wrapped reviewer provider/token", provider, provider, credential) } return identity, nil }, @@ -451,8 +464,13 @@ func TestNewRuntimeUsesReviewerCredentialsAsRuntimeProvider(t *testing.T) { if !ok { t.Fatalf("Runner type = %T, want reviewRunner", runtime.Runner) } - if runner.pipeline.Provider != reviewerProvider || runner.live.Provider != reviewerProvider { - t.Fatalf("runtime providers = pipeline:%p live:%p, want reviewer provider %p", runner.pipeline.Provider, runner.live.Provider, reviewerProvider) + pipelineProvider, ok := runner.pipeline.Provider.(progressProvider) + if !ok || pipelineProvider.provider != reviewerProvider { + t.Fatalf("pipeline provider = %#v, want wrapped reviewer provider", runner.pipeline.Provider) + } + liveProvider, ok := runner.live.Provider.(progressProvider) + if !ok || liveProvider.provider != reviewerProvider { + t.Fatalf("live provider = %#v, want wrapped reviewer provider", runner.live.Provider) } } @@ -1269,6 +1287,7 @@ func TestBuildReviewRunnerWiresNamedSessionDependencies(t *testing.T) { noopLimiter{}, statepaths.NewLayout(t.TempDir(), t.TempDir()), &warnings, + nil, RuntimeOptions{MaxAgents: 3, MaxConcurrency: 2, Retention: retention, RetentionManualOnly: true}, ) @@ -1426,6 +1445,7 @@ func TestReviewLiveRealRunnerHonorsConfiguredRetention(t *testing.T) { noopLimiter{}, layout, opts.Stderr, + nil, runtimeOpts, ) return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil @@ -1508,6 +1528,7 @@ func TestReviewLiveSessionThroughRealRunnerPersistsNamedSession(t *testing.T) { noopLimiter{}, statepaths.NewLayout(t.TempDir(), t.TempDir()), opts.Stderr, + nil, runtimeOpts, ) return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil @@ -1580,6 +1601,7 @@ func TestReviewDryRunRealRunnerHonorsConfiguredRetention(t *testing.T) { noopLimiter{}, layout, opts.Stderr, + nil, runtimeOpts, ) return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil @@ -1649,6 +1671,7 @@ func TestReviewDryRunRealRunnerHonorsConfiguredKeepLiveForever(t *testing.T) { noopLimiter{}, layout, opts.Stderr, + nil, runtimeOpts, ) return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil @@ -1711,6 +1734,7 @@ func TestReviewDryRunRealRunnerHonorsManualOnlyRetention(t *testing.T) { noopLimiter{}, layout, opts.Stderr, + nil, runtimeOpts, ) return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil @@ -1798,6 +1822,407 @@ func TestReviewDryRunJSONHasNoTextQuotaPrefix(t *testing.T) { } } +func TestReviewDryRunProgressWritesToStderr(t *testing.T) { + runner := &fakeRunner{result: testPipelineResult(false)} + cmd, out, errOut := newTestCommandWithStderr(t, testConfig(), fakeFactory(runner), false) + + if err := root.Execute(cmd, []string{"review", "https://github.com/open-cli-collective/codereview-cli/pull/29", "--dry-run", "--json"}); err != nil { + t.Fatalf("Execute: %v", err) + } + stderr := errOut.String() + for _, want := range []string{ + `command="review" op="load_config" target="config"`, + `command="review" op="parse_pr" target="pr"`, + `command="review" op="resolve_profile" target="profile"`, + `command="review" op="build_runtime" target="runtime"`, + `command="review" op="execute_dry_run" target="pr"`, + `command="review" op="render_result" target="stdout"`, + } { + if !strings.Contains(stderr, want) { + t.Fatalf("stderr = %q, want substring %q", stderr, want) + } + } + if strings.Contains(out.String(), "cr progress") { + t.Fatalf("stdout leaked progress = %q", out.String()) + } +} + +func TestReviewQuietSuppressesProgressOnly(t *testing.T) { + runner := &fakeRunner{result: testPipelineResult(false)} + cmd, out, errOut := newTestCommandWithStderr(t, testConfig(), fakeFactory(runner), true) + + if err := root.Execute(cmd, []string{"review", "https://github.com/open-cli-collective/codereview-cli/pull/29", "--dry-run", "--json"}); err != nil { + t.Fatalf("Execute: %v", err) + } + if errOut.Len() != 0 { + t.Fatalf("stderr = %q, want no progress output", errOut.String()) + } + var decoded view.ReviewDryRun + if err := json.Unmarshal(out.Bytes(), &decoded); err != nil { + t.Fatalf("Unmarshal: %v\n%s", err, out.String()) + } + if decoded.Run.PostMode != "dry_run" { + t.Fatalf("decoded post mode = %q, want dry_run", decoded.Run.PostMode) + } +} + +func TestReviewQuietSuppressesProgressOnlyForTextOutput(t *testing.T) { + runner := &fakeRunner{result: testPipelineResult(false)} + cmd, out, errOut := newTestCommandWithStderr(t, testConfig(), fakeFactory(runner), true) + + if err := root.Execute(cmd, []string{"review", "https://github.com/open-cli-collective/codereview-cli/pull/29", "--dry-run"}); err != nil { + t.Fatalf("Execute: %v", err) + } + if errOut.Len() != 0 { + t.Fatalf("stderr = %q, want no progress output", errOut.String()) + } + if strings.Contains(out.String(), "cr progress") { + t.Fatalf("stdout leaked progress = %q", out.String()) + } + if !strings.Contains(out.String(), "Post mode: dry_run") { + t.Fatalf("stdout = %q, want text dry-run render", out.String()) + } +} + +func TestReviewDryRunRealRunnerQuietSuppressesProgressOnly(t *testing.T) { + cfg := testConfig() + ref, pr := reviewCommandPR() + provider := &gitprovider.Fake{} + if err := provider.SetPR(ref, pr); err != nil { + t.Fatalf("SetPR: %v", err) + } + if err := provider.SetDiff(ref, gitprovider.UnifiedDiff{Raw: reviewSmallDiff("main.go")}); err != nil { + t.Fatalf("SetDiff: %v", err) + } + adapter := &llm.FakeAdapter{NameValue: "fake-llm"} + adapter.Queue(fakeReviewLLMResult("selection-session", `{ + "schema_version": 1, + "selected_agents": [], + "thread_actions": [], + "reasoning": "no specialist needed" + }`)) + adapter.Queue(fakeReviewLLMResult("rollup-session", reviewRollupJSON("comment", nil))) + store, err := ledger.Open(context.Background(), filepath.Join(t.TempDir(), "ledger.db")) + if err != nil { + t.Fatalf("ledger.Open: %v", err) + } + t.Cleanup(func() { + if err := store.Close(); err != nil { + t.Fatalf("store.Close: %v", err) + } + }) + cmd, out, errOut := newTestCommandWithStderr(t, cfg, func(_ *cobra.Command, opts *root.Options, _ config.File, profile config.Profile, runtimeOpts RuntimeOptions) (Runtime, error) { + logger := newProgressLogger(opts) + runner := buildReviewRunner( + store, + withProgressProvider(logger, provider), + adapter, + profile, + noopLimiter{}, + statepaths.NewLayout(t.TempDir(), t.TempDir()), + opts.Stderr, + logger, + runtimeOpts, + ) + return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil + }, true) + + if err := root.Execute(cmd, []string{"review", pr.URL, "--dry-run", "--json"}); err != nil { + t.Fatalf("Execute: %v", err) + } + if errOut.Len() != 0 { + t.Fatalf("stderr = %q, want no quiet progress output", errOut.String()) + } + var decoded view.ReviewDryRun + if err := json.Unmarshal(out.Bytes(), &decoded); err != nil { + t.Fatalf("Unmarshal: %v\n%s", err, out.String()) + } + if decoded.Run.PostMode != "dry_run" { + t.Fatalf("decoded post mode = %q, want dry_run", decoded.Run.PostMode) + } +} + +func TestReviewDryRunRealRunnerWritesGitHubProgressToStderr(t *testing.T) { + cfg := testConfig() + ref, pr := reviewCommandPR() + provider := &gitprovider.Fake{} + if err := provider.SetPR(ref, pr); err != nil { + t.Fatalf("SetPR: %v", err) + } + if err := provider.SetDiff(ref, gitprovider.UnifiedDiff{Raw: reviewSmallDiff("main.go")}); err != nil { + t.Fatalf("SetDiff: %v", err) + } + adapter := &llm.FakeAdapter{NameValue: "fake-llm"} + adapter.Queue(fakeReviewLLMResult("selection-session", `{ + "schema_version": 1, + "selected_agents": [], + "thread_actions": [], + "reasoning": "no specialist needed" + }`)) + adapter.Queue(fakeReviewLLMResult("rollup-session", reviewRollupJSON("comment", nil))) + store, err := ledger.Open(context.Background(), filepath.Join(t.TempDir(), "ledger.db")) + if err != nil { + t.Fatalf("ledger.Open: %v", err) + } + t.Cleanup(func() { + if err := store.Close(); err != nil { + t.Fatalf("store.Close: %v", err) + } + }) + cmd, out, errOut := newTestCommandWithStderr(t, cfg, func(_ *cobra.Command, opts *root.Options, _ config.File, profile config.Profile, runtimeOpts RuntimeOptions) (Runtime, error) { + logger := newProgressLogger(opts) + runner := buildReviewRunner( + store, + withProgressProvider(logger, provider), + adapter, + profile, + noopLimiter{}, + statepaths.NewLayout(t.TempDir(), t.TempDir()), + opts.Stderr, + logger, + runtimeOpts, + ) + return Runtime{Runner: runner, PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}}, nil + }, false) + + if err := root.Execute(cmd, []string{"review", pr.URL, "--dry-run", "--json"}); err != nil { + t.Fatalf("Execute: %v", err) + } + stderr := errOut.String() + for _, want := range []string{ + `command="review" op="fetch_pr" target="pr"`, + `command="review" op="fetch_diff" target="pr"`, + `command="review" op="list_threads" target="threads"`, + `command="review" op="run_llm_task" target="llm_task"`, + `event=start`, + `event=finish`, + `task_id="orchestrator-selection"`, + `phase="selection"`, + `session_id="selection-session"`, + `task_id="orchestrator-rollup"`, + `phase="rollup"`, + `session_id="rollup-session"`, + } { + if !strings.Contains(stderr, want) { + t.Fatalf("stderr = %q, want substring %q", stderr, want) + } + } + if strings.Contains(out.String(), "cr progress") { + t.Fatalf("stdout leaked progress = %q", out.String()) + } +} + +func TestProgressProviderGetPRErrorWritesErrorBreadcrumb(t *testing.T) { + var errOut bytes.Buffer + provider := &gitprovider.Fake{} + provider.SetError(gitprovider.OperationGetPR, gitprovider.WrapError(gitprovider.ErrRetryable, gitprovider.OperationGetPR, context.DeadlineExceeded)) + wrapped := withProgressProvider(progress.New(&errOut, false, nil), provider) + + _, err := wrapped.GetPR(context.Background(), gitprovider.PRRef{Host: "github.com", Owner: "open-cli-collective", Repo: "codereview-cli", Number: 29}) + if err == nil { + t.Fatal("GetPR error = nil, want provider failure") + } + stderr := errOut.String() + if !strings.Contains(stderr, `event=error`) || !strings.Contains(stderr, `command="review" op="fetch_pr" target="pr"`) { + t.Fatalf("stderr = %q, want error breadcrumb for fetch_pr", stderr) + } +} + +func TestProgressAdapterStartAndWaitWriteStructuredBreadcrumbs(t *testing.T) { + var errOut bytes.Buffer + adapter := &llm.FakeAdapter{NameValue: "fake-llm"} + tokensIn := 11 + tokensOut := 7 + adapter.Queue(llm.FakeResult{ + SessionID: "sess-123", + Response: llm.Response{ + Usage: llm.Usage{ + TokensIn: &tokensIn, + TokensOut: &tokensOut, + }, + }, + }) + wrapped := withProgressAdapter(progress.New(&errOut, false, nil), adapter, "openai", "codex_cli") + + stream, err := wrapped.Start(context.Background(), llm.Request{ + Model: "gpt-5.5", + Effort: "high", + LogPath: filepath.Join(t.TempDir(), "selector.jsonl"), + Prompt: "prompt", + }) + if err != nil { + t.Fatalf("Start: %v", err) + } + if _, err := stream.Wait(context.Background()); err != nil { + t.Fatalf("Wait: %v", err) + } + + stderr := errOut.String() + for _, want := range []string{ + `command="review" op="start_llm" target="llm"`, + `provider="openai"`, + `harness="codex_cli"`, + `model="gpt-5.5"`, + `effort="high"`, + `log_file="selector.jsonl"`, + `tokens_in="11"`, + `tokens_out="7"`, + } { + if !strings.Contains(stderr, want) { + t.Fatalf("stderr = %q, want substring %q", stderr, want) + } + } +} + +func TestProgressAdapterResumeErrorWritesErrorBreadcrumb(t *testing.T) { + var errOut bytes.Buffer + adapter := &llm.FakeAdapter{ + NameValue: "fake-llm", + SupportsResumeValue: true, + } + adapter.Queue(llm.FakeResult{StartErr: context.DeadlineExceeded}) + wrapped := withProgressAdapter(progress.New(&errOut, false, nil), adapter, "openai", "codex_cli") + + _, err := wrapped.Resume(context.Background(), "stored-session", llm.Request{Model: "gpt-5.5", Prompt: "prompt"}) + if err == nil { + t.Fatal("Resume error = nil, want failure") + } + stderr := errOut.String() + if !strings.Contains(stderr, `command="review" op="resume_llm" target="llm"`) || + !strings.Contains(stderr, `event=error`) { + t.Fatalf("stderr = %q, want resume error breadcrumb", stderr) + } +} + +func TestWithProgressProviderPreservesOptionalRangeDiffCapability(t *testing.T) { + wrapped := withProgressProvider(progress.New(io.Discard, false, nil), &gitprovider.Fake{}) + if _, ok := wrapped.(interface { + GetDiffBetweenRefs(context.Context, gitprovider.PRRef, string, string) (gitprovider.UnifiedDiff, error) + }); ok { + t.Fatalf("wrapped provider unexpectedly advertises GetDiffBetweenRefs") + } +} + +func TestFileTargetSanitizesRepoPath(t *testing.T) { + got := fileTarget(" ./dir/../a\r\nb.go ") + if got != "file:a__b.go" { + t.Fatalf("fileTarget = %q, want sanitized repo path", got) + } +} + +func TestProgressPlannerWritesRunIDBreadcrumb(t *testing.T) { + var errOut bytes.Buffer + store, err := ledger.Open(context.Background(), filepath.Join(t.TempDir(), "ledger.db")) + if err != nil { + t.Fatalf("ledger.Open: %v", err) + } + t.Cleanup(func() { + if err := store.Close(); err != nil { + t.Fatalf("store.Close: %v", err) + } + }) + provider := &gitprovider.Fake{} + ref, pr := reviewCommandPR() + if err := provider.SetPR(ref, pr); err != nil { + t.Fatalf("SetPR: %v", err) + } + if err := provider.SetDiff(ref, gitprovider.UnifiedDiff{Raw: reviewSmallDiff("main.go")}); err != nil { + t.Fatalf("SetDiff: %v", err) + } + adapter := &llm.FakeAdapter{NameValue: "fake-llm"} + adapter.Queue(fakeReviewLLMResult("selection-session", `{ + "schema_version": 1, + "selected_agents": [], + "thread_actions": [], + "reasoning": "no specialist needed" + }`)) + adapter.Queue(fakeReviewLLMResult("rollup-session", reviewRollupJSON("comment", nil))) + logger := progress.New(&errOut, false, nil) + runner := buildReviewRunner( + store, + provider, + adapter, + testConfig().Profiles["home"], + noopLimiter{}, + statepaths.NewLayout(t.TempDir(), t.TempDir()), + &errOut, + logger, + RuntimeOptions{}, + ) + + run, err := store.AllocateRun(context.Background(), ledger.AllocateRunParams{ + RunID: "run-123", + PRKey: "github_open-cli-collective_codereview-cli_29", + PRURL: pr.URL, + Profile: "home", + PostingIdentity: "review-bot", + PostMode: ledger.PostModeLive, + SHA: pr.Head.SHA, + BaseSHA: pr.Base.SHA, + StartedAt: time.Now().UTC(), + ArtifactPath: t.TempDir(), + }) + if err != nil { + t.Fatalf("AllocateRun: %v", err) + } + _, err = runner.live.Planner.Live(context.Background(), pipeline.Request{ + PRRef: ref, + PRURL: pr.URL, + ProfileName: "home", + Profile: testConfig().Profiles["home"], + PostingIdentity: gitprovider.Identity{Login: "review-bot", ID: "bot-id"}, + }, run) + if err != nil { + t.Fatalf("Planner.Live: %v", err) + } + stderr := errOut.String() + if !strings.Contains(stderr, `command="review" op="plan_live_review" target="pr"`) || + !strings.Contains(stderr, `run_id="run-123"`) { + t.Fatalf("stderr = %q, want planner breadcrumb", stderr) + } +} + +func TestReviewDryRunTextProgressWritesStructuredStderrWithoutStdoutLeak(t *testing.T) { + runner := &fakeRunner{result: testPipelineResult(false)} + cmd, out, errOut := newTestCommandWithStderr(t, testConfig(), fakeFactory(runner), false) + + if err := root.Execute(cmd, []string{"review", "https://github.com/open-cli-collective/codereview-cli/pull/29", "--dry-run"}); err != nil { + t.Fatalf("Execute: %v", err) + } + if strings.Contains(out.String(), "cr progress") { + t.Fatalf("stdout leaked progress = %q", out.String()) + } + if !strings.Contains(out.String(), "Post mode: dry_run") { + t.Fatalf("stdout = %q, want text dry-run render", out.String()) + } + assertProgressOutput(t, errOut.String(), []string{ + `command="review" op="load_config" target="config"`, + `command="review" op="execute_dry_run" target="pr"`, + `command="review" op="render_result" target="stdout"`, + }) +} + +func assertProgressOutput(t *testing.T, stderr string, wants []string) { + t.Helper() + lines := strings.Split(strings.TrimSpace(stderr), "\n") + if len(lines) == 0 || strings.TrimSpace(lines[0]) == "" { + t.Fatal("stderr has no progress lines") + } + for _, line := range lines { + if !strings.HasPrefix(line, "cr progress ") { + t.Fatalf("stderr line = %q, want progress prefix", line) + } + if !strings.Contains(line, " event=") || !strings.Contains(line, ` command="`) || !strings.Contains(line, ` op="`) || !strings.Contains(line, ` target="`) { + t.Fatalf("stderr line = %q, want structured progress fields", line) + } + } + for _, want := range wants { + if !strings.Contains(stderr, want) { + t.Fatalf("stderr = %q, want substring %q", stderr, want) + } + } +} + func TestReviewFailOnReturnsFailureAfterRendering(t *testing.T) { runner := &fakeRunner{result: testPipelineResult(true)} cmd, out := newTestCommand(t, testConfig(), fakeFactory(runner)) @@ -2034,6 +2459,7 @@ func newTestCommand(t *testing.T, cfg config.File, factory RuntimeFactory) (*cob var out bytes.Buffer cmd, opts := root.NewCommandWithOptions(&root.Options{ ConfigPath: path, + Quiet: true, Stdin: strings.NewReader(""), Stdout: &out, Stderr: &out, @@ -2042,6 +2468,25 @@ func newTestCommand(t *testing.T, cfg config.File, factory RuntimeFactory) (*cob return cmd, &out } +func newTestCommandWithStderr(t *testing.T, cfg config.File, factory RuntimeFactory, quiet bool) (*cobra.Command, *bytes.Buffer, *bytes.Buffer) { + t.Helper() + path := filepath.Join(t.TempDir(), "config.yml") + if err := config.Save(path, cfg); err != nil { + t.Fatalf("Save config: %v", err) + } + var out bytes.Buffer + var errOut bytes.Buffer + cmd, opts := root.NewCommandWithOptions(&root.Options{ + ConfigPath: path, + Quiet: quiet, + Stdin: strings.NewReader(""), + Stdout: &out, + Stderr: &errOut, + }) + RegisterWithFactory(cmd, opts, factory) + return cmd, &out, &errOut +} + func testConfig() config.File { return config.File{ Keyring: config.KeyringConfig{Backend: "memory"}, diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index add1620..1ac5458 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -76,6 +76,38 @@ type NamedSessionStore interface { GetNamedSession(context.Context, string) (ledger.NamedSession, error) } +// LLMTaskProgress records task-aware LLM pipeline breadcrumbs without owning +// command IO details. +type LLMTaskProgress interface { + StartLLMTask(LLMTaskProgressEvent) LLMTaskProgressSpan + LoadLLMTask(LLMTaskProgressEvent, LLMTaskProgressResult) +} + +// LLMTaskProgressSpan is one active LLM task breadcrumb. +type LLMTaskProgressSpan interface { + End(error, LLMTaskProgressResult) +} + +// LLMTaskProgressEvent describes one LLM task execution or reload. +type LLMTaskProgressEvent struct { + TaskID string + Phase string + AgentID string + Model string + Effort string + LogPath string + ResumeSessionID string + Source string +} + +// LLMTaskProgressResult describes the outcome of one task execution or reload. +type LLMTaskProgressResult struct { + ProviderSessionID string + Status string + ValidationAttempts int + Cached bool +} + // ContextBudget limits prompt size. A negative MaxPromptBytes disables checks. type ContextBudget struct { MaxPromptBytes int @@ -89,6 +121,7 @@ type Options struct { NamedSessions NamedSessionStore Layout statepaths.Layout Warnings io.Writer + TaskProgress LLMTaskProgress Now func() time.Time NewRunID func() string @@ -1227,6 +1260,8 @@ func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpe resumeSessionID = taskSessionID } } + progressEvent := newLLMTaskProgressEvent(spec, resumeSessionID) + progressSpan := startLLMTaskProgress(opts, progressEvent) started := opts.now() result, err := llm.RunStructuredWithSessionResume(ctx, opts.Adapter, resumeSessionID, llm.Request{ @@ -1261,7 +1296,10 @@ func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpe if strings.TrimSpace(draft.providerSessionID) != "" { session = draft.toLedger(spec.runID) if err := opts.Store.InsertSession(ctx, session); err != nil { + meta.Status = llmTaskStatusFailedBlocking + meta.ProviderSessionID = draft.providerSessionID var zero T + endLLMTaskProgress(progressSpan, err, llmTaskProgressResult(meta, result, false)) return zero, sessionDraft{}, ledger.Session{}, err } } @@ -1272,8 +1310,10 @@ func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpe meta.ProviderSessionID = session.ProviderSessionID if writeErr := writeLLMTaskSuccess(spec.artifacts, &meta, result.Response.StructuredOutput); writeErr != nil { var zero T + endLLMTaskProgress(progressSpan, writeErr, llmTaskProgressResult(meta, result, false)) return zero, sessionDraft{}, ledger.Session{}, writeErr } + endLLMTaskProgress(progressSpan, nil, llmTaskProgressResult(meta, result, false)) return result.Value, draft, session, nil } @@ -1283,9 +1323,11 @@ func runStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSpe meta.ProviderSessionID = session.ProviderSessionID if writeErr := writeLLMTaskFailure(spec.artifacts, &meta, result.ValidationAttempts); writeErr != nil { var zero T + endLLMTaskProgress(progressSpan, writeErr, llmTaskProgressResult(meta, result, false)) return zero, draft, session, writeErr } var zero T + endLLMTaskProgress(progressSpan, err, llmTaskProgressResult(meta, result, false)) return zero, draft, session, &llmTaskError{status: meta.Status, err: err} } @@ -1332,6 +1374,7 @@ func loadStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSp if err != nil { return zero, sessionDraft{}, ledger.Session{}, true, err } + loadLLMTaskProgress(opts, newLLMTaskProgressEvent(spec, taskResumeSessionID(meta)), llmTaskProgressResult(meta, llm.StructuredResult[T]{SessionID: meta.ProviderSessionID}, true)) return value, sessionDraftFromLedger(session), session, true, nil case llmTaskStatusFailedIsolated: if spec.llmFailureStatus != llmTaskStatusFailedIsolated { @@ -1341,6 +1384,7 @@ func loadStructuredTask[T any](ctx context.Context, opts Options, spec llmTaskSp if err != nil { return zero, sessionDraft{}, ledger.Session{}, true, err } + loadLLMTaskProgress(opts, newLLMTaskProgressEvent(spec, taskResumeSessionID(meta)), llmTaskProgressResult(meta, llm.StructuredResult[T]{SessionID: meta.ProviderSessionID}, true)) return zero, draft, session, true, &llmTaskError{status: llmTaskStatusFailedIsolated, err: errors.New(taskErrorText(meta))} case llmTaskStatusFailedBlocking: return zero, sessionDraft{}, ledger.Session{}, false, nil @@ -1496,6 +1540,73 @@ func taskErrorText(meta llmTaskMetadata) string { return fmt.Sprintf("LLM task %q failed", meta.TaskID) } +func newLLMTaskProgressEvent(spec llmTaskSpec, resumeSessionID string) LLMTaskProgressEvent { + agentID := "" + if spec.agentID != nil { + agentID = *spec.agentID + } + source := "execute" + if strings.TrimSpace(resumeSessionID) != "" { + source = "resume" + } + return LLMTaskProgressEvent{ + TaskID: spec.taskID, + Phase: spec.phase, + AgentID: agentID, + Model: spec.model, + Effort: spec.effort, + LogPath: spec.logPath, + ResumeSessionID: resumeSessionID, + Source: source, + } +} + +func llmTaskProgressResult(meta llmTaskMetadata, result any, cached bool) LLMTaskProgressResult { + out := LLMTaskProgressResult{ + ProviderSessionID: meta.ProviderSessionID, + Status: string(meta.Status), + Cached: cached, + } + switch value := result.(type) { + case llm.StructuredResult[llm.Selection]: + out.ValidationAttempts = len(value.ValidationAttempts) + if out.ProviderSessionID == "" { + out.ProviderSessionID = value.SessionID + } + case llm.StructuredResult[llm.Findings]: + out.ValidationAttempts = len(value.ValidationAttempts) + if out.ProviderSessionID == "" { + out.ProviderSessionID = value.SessionID + } + case llm.StructuredResult[review.Rollup]: + out.ValidationAttempts = len(value.ValidationAttempts) + if out.ProviderSessionID == "" { + out.ProviderSessionID = value.SessionID + } + } + return out +} + +func startLLMTaskProgress(opts Options, event LLMTaskProgressEvent) LLMTaskProgressSpan { + if opts.TaskProgress == nil { + return nil + } + return opts.TaskProgress.StartLLMTask(event) +} + +func endLLMTaskProgress(span LLMTaskProgressSpan, err error, result LLMTaskProgressResult) { + if span != nil { + span.End(err, result) + } +} + +func loadLLMTaskProgress(opts Options, event LLMTaskProgressEvent, result LLMTaskProgressResult) { + if opts.TaskProgress == nil { + return + } + opts.TaskProgress.LoadLLMTask(event, result) +} + func baseLLMTaskMetadata(opts Options, spec llmTaskSpec, draft sessionDraft) llmTaskMetadata { agentID := "" if spec.agentID != nil { diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go index ea41330..ed57cca 100644 --- a/internal/pipeline/pipeline_test.go +++ b/internal/pipeline/pipeline_test.go @@ -1274,6 +1274,132 @@ func TestRunStructuredTaskReviewerStartFailureIsBlocking(t *testing.T) { } } +func TestRunStructuredTaskReportsProgressOnExecution(t *testing.T) { + ctx := context.Background() + store := openPipelineStore(t) + defer closeStore(t, store) + artifacts := ArtifactPathsFromDir(t.TempDir()) + layout := statepaths.NewLayout(t.TempDir(), t.TempDir()) + allocatePipelineRun(t, store, layout, "run-progress", ledger.PostModeDryRun, fixedNow()) + adapter := &llm.FakeAdapter{NameValue: "fake-llm"} + adapter.Queue(fakeLLMResult("task-session", `"ok"`, 5, 3)) + progress := &fakeTaskProgress{} + spec := llmTaskSpec{ + runID: "run-progress", + taskID: orchestratorRollupStage, + phase: "rollup", + inputFingerprint: "fingerprint", + artifacts: artifacts, + role: ledger.SessionRoleOrchestrator, + model: "gpt-5.5", + effort: "high", + logPath: filepath.Join(t.TempDir(), "rollup.jsonl"), + prompt: "prompt", + } + + value, _, _, err := runStructuredTask[string](ctx, Options{ + Adapter: adapter, + Store: store, + TaskProgress: progress, + Now: fixedNow, + NewSessionRowID: sequence("session"), + }, spec, func(data []byte) (string, error) { + return string(data), nil + }) + if err != nil { + t.Fatalf("runStructuredTask: %v", err) + } + if value != `"ok"` { + t.Fatalf("value = %q, want ok payload", value) + } + if len(progress.starts) != 1 || len(progress.ends) != 1 { + t.Fatalf("progress starts=%d ends=%d, want 1/1", len(progress.starts), len(progress.ends)) + } + if progress.starts[0].TaskID != orchestratorRollupStage || progress.starts[0].Phase != "rollup" || progress.starts[0].Source != "execute" { + t.Fatalf("start = %#v, want rollup execute event", progress.starts[0]) + } + if progress.starts[0].Model != spec.model || progress.starts[0].Effort != spec.effort || progress.starts[0].LogPath != spec.logPath { + t.Fatalf("start fields = %#v, want model/effort/logPath from spec", progress.starts[0]) + } + if progress.ends[0].result.Status != string(llmTaskStatusSucceeded) || progress.ends[0].result.ProviderSessionID != "task-session" || progress.ends[0].result.Cached { + t.Fatalf("end result = %#v, want succeeded uncached task-session", progress.ends[0].result) + } +} + +func TestLoadStructuredTaskReportsCachedProgress(t *testing.T) { + ctx := context.Background() + store := openPipelineStore(t) + defer closeStore(t, store) + artifacts := ArtifactPathsFromDir(t.TempDir()) + layout := statepaths.NewLayout(t.TempDir(), t.TempDir()) + allocatePipelineRun(t, store, layout, "run-cached-progress", ledger.PostModeDryRun, fixedNow()) + spec := llmTaskSpec{ + runID: "run-cached-progress", + taskID: orchestratorSelectionStage, + phase: "selection", + inputFingerprint: "fingerprint", + artifacts: artifacts, + role: ledger.SessionRoleOrchestrator, + model: "gpt-5.4-mini", + effort: "low", + logPath: filepath.Join(t.TempDir(), "selection.jsonl"), + prompt: "prompt", + } + session := ledger.Session{ + SessionRowID: "session-row", + RunID: spec.runID, + ProviderSessionID: "cached-session", + Role: ledger.SessionRoleOrchestrator, + Adapter: "fake-llm", + Model: spec.model, + StartedAt: fixedNow(), + } + if err := store.InsertSession(ctx, session); err != nil { + t.Fatalf("InsertSession: %v", err) + } + meta := llmTaskMetadata{ + SchemaVersion: llmTaskSchemaVersion, + TaskID: spec.taskID, + Phase: spec.phase, + InputFingerprint: spec.inputFingerprint, + Adapter: "fake-llm", + Status: llmTaskStatusSucceeded, + SessionRowID: session.SessionRowID, + ProviderSessionID: session.ProviderSessionID, + ValidatedOutputPath: "", + } + if err := writeLLMTaskSuccess(artifacts, &meta, []byte(`"cached"`)); err != nil { + t.Fatalf("writeLLMTaskSuccess: %v", err) + } + progress := &fakeTaskProgress{} + + value, _, _, ok, err := loadStructuredTask[string](ctx, Options{ + Adapter: &llm.FakeAdapter{NameValue: "fake-llm"}, + Store: store, + TaskProgress: progress, + }, spec, func(data []byte) (string, error) { + return string(data), nil + }) + if err != nil { + t.Fatalf("loadStructuredTask: %v", err) + } + if !ok || strings.TrimSpace(value) != `"cached"` { + t.Fatalf("ok=%v value=%q, want cached task load", ok, value) + } + if len(progress.loads) != 1 { + t.Fatalf("progress loads=%d, want 1", len(progress.loads)) + } + if progress.loads[0].event.TaskID != orchestratorSelectionStage || progress.loads[0].event.Phase != "selection" || progress.loads[0].event.Source != "resume" { + t.Fatalf("load event = %#v, want selection resume event", progress.loads[0].event) + } + if progress.loads[0].event.ResumeSessionID != "cached-session" || progress.loads[0].event.Model != spec.model || progress.loads[0].event.Effort != spec.effort { + t.Fatalf("load event fields = %#v, want cached session/model/effort", progress.loads[0].event) + } + if !progress.loads[0].result.Cached || progress.loads[0].result.Status != string(llmTaskStatusSucceeded) || progress.loads[0].result.ProviderSessionID != "cached-session" { + t.Fatalf("load = %#v, want cached succeeded selection task", progress.loads[0]) + } +} + func assertTaskPayloadContains(t *testing.T, path, want string) { t.Helper() if strings.TrimSpace(path) == "" { @@ -1288,6 +1414,48 @@ func assertTaskPayloadContains(t *testing.T, path, want string) { } } +type fakeTaskProgress struct { + mu sync.Mutex + starts []LLMTaskProgressEvent + ends []fakeTaskProgressEnd + loads []fakeTaskProgressLoad +} + +type fakeTaskProgressSpan struct { + parent *fakeTaskProgress + event LLMTaskProgressEvent +} + +type fakeTaskProgressEnd struct { + event LLMTaskProgressEvent + err error + result LLMTaskProgressResult +} + +type fakeTaskProgressLoad struct { + event LLMTaskProgressEvent + result LLMTaskProgressResult +} + +func (f *fakeTaskProgress) StartLLMTask(event LLMTaskProgressEvent) LLMTaskProgressSpan { + f.mu.Lock() + defer f.mu.Unlock() + f.starts = append(f.starts, event) + return fakeTaskProgressSpan{parent: f, event: event} +} + +func (f *fakeTaskProgress) LoadLLMTask(event LLMTaskProgressEvent, result LLMTaskProgressResult) { + f.mu.Lock() + defer f.mu.Unlock() + f.loads = append(f.loads, fakeTaskProgressLoad{event: event, result: result}) +} + +func (s fakeTaskProgressSpan) End(err error, result LLMTaskProgressResult) { + s.parent.mu.Lock() + defer s.parent.mu.Unlock() + s.parent.ends = append(s.parent.ends, fakeTaskProgressEnd{event: s.event, err: err, result: result}) +} + func TestDryRunReviewerModelTierOverrideAppliesOnlyToReviewers(t *testing.T) { ctx := context.Background() store := openPipelineStore(t) diff --git a/internal/progress/progress.go b/internal/progress/progress.go index 761e417..bf1f59c 100644 --- a/internal/progress/progress.go +++ b/internal/progress/progress.go @@ -3,6 +3,7 @@ package progress import ( "io" + "sort" "strconv" "strings" "sync" @@ -20,12 +21,19 @@ type Logger struct { mu sync.Mutex } +// Field is one additional structured progress attribute. +type Field struct { + Key string + Value string +} + // Span represents one started progress operation. type Span struct { logger *Logger command string op string target string + fields []Field started time.Time done bool } @@ -40,11 +48,18 @@ func New(w io.Writer, disabled bool, now Clock) *Logger { // Start writes a start line and returns a span that can be ended exactly once. func (l *Logger) Start(command, op, target string) *Span { + return l.StartFields(command, op, target) +} + +// StartFields writes a start line with additional structured fields and returns +// a span that can be ended exactly once. +func (l *Logger) StartFields(command, op, target string, fields ...Field) *Span { span := &Span{ logger: l, command: command, op: op, target: target, + fields: cloneFields(fields), started: l.now(), } l.writeLine(progressLine{ @@ -52,6 +67,7 @@ func (l *Logger) Start(command, op, target string) *Span { command: command, op: op, target: target, + fields: span.fields, }) return span } @@ -59,6 +75,12 @@ func (l *Logger) Start(command, op, target string) *Span { // End writes a finish or error line. It is a no-op for disabled loggers or a // span that has already ended. func (s *Span) End(err error) { + s.EndFields(err) +} + +// EndFields writes a finish or error line with additional structured fields. +// It is a no-op for disabled loggers or a span that has already ended. +func (s *Span) EndFields(err error, fields ...Field) { if s == nil || s.logger == nil || s.done || s.logger.disabled { return } @@ -68,6 +90,7 @@ func (s *Span) End(err error) { command: s.command, op: s.op, target: s.target, + fields: mergeFields(s.fields, fields), durationMS: dur, } if err != nil { @@ -86,6 +109,7 @@ type progressLine struct { command string op string target string + fields []Field durationMS int64 status string errSummary string @@ -113,6 +137,15 @@ func (l *Logger) writeLine(line progressLine) { b.WriteString(" target=") b.WriteString(quoteValue(line.target)) } + for _, field := range normalizeFields(line.fields) { + if field.Key == "" { + continue + } + b.WriteByte(' ') + b.WriteString(field.Key) + b.WriteByte('=') + b.WriteString(quoteValue(field.Value)) + } if line.durationMS > 0 || line.event == "finish" || line.event == "error" { b.WriteString(" duration_ms=") b.WriteString(strconv.FormatInt(line.durationMS, 10)) @@ -185,3 +218,56 @@ func durationMS(d time.Duration) int64 { } return d.Milliseconds() } + +func cloneFields(fields []Field) []Field { + if len(fields) == 0 { + return nil + } + cloned := make([]Field, 0, len(fields)) + for _, field := range fields { + key := strings.TrimSpace(field.Key) + if key == "" { + continue + } + cloned = append(cloned, Field{Key: key, Value: field.Value}) + } + return cloned +} + +func mergeFields(base, extra []Field) []Field { + if len(base) == 0 && len(extra) == 0 { + return nil + } + merged := cloneFields(base) + if len(extra) == 0 { + return merged + } + extraCloned := cloneFields(extra) + if len(extraCloned) == 0 { + return merged + } + byKey := make(map[string]int, len(merged)) + for i, field := range merged { + byKey[field.Key] = i + } + for _, field := range extraCloned { + if index, ok := byKey[field.Key]; ok { + merged[index].Value = field.Value + continue + } + byKey[field.Key] = len(merged) + merged = append(merged, field) + } + return merged +} + +func normalizeFields(fields []Field) []Field { + if len(fields) == 0 { + return nil + } + normalized := cloneFields(fields) + sort.SliceStable(normalized, func(i, j int) bool { + return normalized[i].Key < normalized[j].Key + }) + return normalized +} diff --git a/internal/progress/progress_test.go b/internal/progress/progress_test.go index 2a76e03..769dfc9 100644 --- a/internal/progress/progress_test.go +++ b/internal/progress/progress_test.go @@ -55,6 +55,32 @@ func TestLoggerEndErrorSanitizesSummary(t *testing.T) { } } +func TestLoggerFieldsAreRenderedAndSorted(t *testing.T) { + var out bytes.Buffer + clock := &fixedClock{ + times: []time.Time{ + time.Unix(40, 0), + time.Unix(40, 15*int64(time.Millisecond)), + }, + } + logger := New(&out, false, clock.Now) + + span := logger.StartFields("review", "run_llm", "llm", + Field{Key: "session_id", Value: "pending"}, + Field{Key: "model", Value: "gpt-5.5"}, + Field{Key: "provider", Value: "openai"}, + ) + span.EndFields(nil, Field{Key: "session_id", Value: "sess-123"}) + + got := out.String() + if !strings.Contains(got, `command="review" op="run_llm" target="llm" model="gpt-5.5" provider="openai" session_id="pending"`) { + t.Fatalf("start line = %q", got) + } + if !strings.Contains(got, `command="review" op="run_llm" target="llm" model="gpt-5.5" provider="openai" session_id="sess-123" duration_ms=15 status=ok`) { + t.Fatalf("finish line = %q", got) + } +} + func TestDisabledLoggerWritesNothing(t *testing.T) { var out bytes.Buffer logger := New(&out, true, time.Now)