From 4821cd0a0ac29da73d47cf7d9190cf2d45179be1 Mon Sep 17 00:00:00 2001 From: Ryan Mahoney Date: Mon, 23 Mar 2026 15:38:31 -0500 Subject: [PATCH 1/9] Add WorktreePath field to ReviewJob and EnqueueOpts - feat(storage): add worktree_path column migration (#1) - Persist WorktreePath in EnqueueJob INSERT and returned struct (#1) - Read worktree_path in ClaimJob, GetJobByID, and ListJobs (#1) - Detect worktree and pass WorktreePath in handleEnqueue (#1) - feat(daemon): use worktree path for agent cwd and prompt building (#1) - test(storage): verify ReenqueueJob preserves worktree_path (#1) - feat(daemon): use worktree path for hook working directory (#1) - fix(daemon): address PR review feedback for worktree path handling (#1) --- internal/daemon/broadcaster.go | 21 ++++---- internal/daemon/hooks.go | 9 +++- internal/daemon/server.go | 71 +++++++++++++----------- internal/daemon/worker.go | 96 +++++++++++++++++++++------------ internal/storage/db.go | 12 +++++ internal/storage/db_job_test.go | 33 ++++++++++++ internal/storage/hydration.go | 2 + internal/storage/jobs.go | 20 +++---- internal/storage/models.go | 1 + 9 files changed, 181 insertions(+), 84 deletions(-) diff --git a/internal/daemon/broadcaster.go b/internal/daemon/broadcaster.go index ff88162a9..bcb949e98 100644 --- a/internal/daemon/broadcaster.go +++ b/internal/daemon/broadcaster.go @@ -8,16 +8,17 @@ import ( // Event represents a review event that can be broadcast type Event struct { - Type string `json:"type"` - TS time.Time `json:"ts"` - JobID int64 `json:"job_id"` - Repo string `json:"repo"` - RepoName string `json:"repo_name"` - SHA string `json:"sha"` - Agent string `json:"agent,omitempty"` - Verdict string `json:"verdict,omitempty"` - Findings string `json:"findings,omitempty"` - Error string `json:"error,omitempty"` + Type string `json:"type"` + TS time.Time `json:"ts"` + JobID int64 `json:"job_id"` + Repo string `json:"repo"` + RepoName string `json:"repo_name"` + SHA string `json:"sha"` + Agent string `json:"agent,omitempty"` + Verdict string `json:"verdict,omitempty"` + Findings string `json:"findings,omitempty"` + Error string `json:"error,omitempty"` + WorktreePath string `json:"worktree_path,omitempty"` } // Subscriber represents a client subscribed to events diff --git a/internal/daemon/hooks.go b/internal/daemon/hooks.go index 879645773..ca65b7e7c 100644 --- a/internal/daemon/hooks.go +++ b/internal/daemon/hooks.go @@ -9,6 +9,7 @@ import ( "log" "net/http" neturl "net/url" + "os" "os/exec" "path/filepath" "runtime" @@ -170,7 +171,13 @@ func (hr *HookRunner) handleEvent(event Event) { fired++ // Run async so hooks don't block workers hr.wg.Add(1) - go hr.runHook(cmd, event.Repo) + hookDir := event.Repo + if event.WorktreePath != "" { + if _, err := os.Stat(event.WorktreePath); err == nil { + hookDir = event.WorktreePath + } + } + go hr.runHook(cmd, hookDir) } if fired > 0 { diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 4560fd552..2ee54fb6d 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -707,6 +707,13 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } + // Detect worktree: if the git working directory differs from the main + // repo root, the request originated from a worktree checkout. + var worktreePath string + if gitCwd != repoRoot { + worktreePath = gitCwd + } + // Check if branch is excluded from reviews currentBranch := git.GetCurrentBranch(gitCwd) if currentBranch != "" && config.IsBranchExcluded(repoRoot, currentBranch) { @@ -834,6 +841,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) JobType: req.JobType, Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue prompt job: %v", err)) @@ -843,16 +851,17 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Dirty review - use pre-captured diff targetSHA, _ := git.ResolveSHA(gitCwd, "HEAD") job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, targetSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - DiffContent: req.DiffContent, - Provider: req.Provider, + RepoID: repo.ID, + GitRef: gitRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, targetSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + DiffContent: req.DiffContent, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue dirty job: %v", err)) @@ -917,15 +926,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, endSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Provider: req.Provider, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, endSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -966,17 +976,18 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { patchID := git.GetPatchID(gitCwd, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, sha), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, - Provider: req.Provider, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, sha), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index cd1c99d6b..d1c43509d 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "os" "strings" "sync" "sync/atomic" @@ -361,6 +362,17 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { return } + // Resolve effective repo path: use worktree if available and still exists. + effectiveRepoPath := job.RepoPath + if job.WorktreePath != "" { + if _, err := os.Stat(job.WorktreePath); err != nil { + log.Printf("[%s] Worktree %s no longer exists for job %d, using main repo", + workerID, job.WorktreePath, job.ID) + } else { + effectiveRepoPath = job.WorktreePath + } + } + // Build the prompt (or use pre-stored prompt for task/compact jobs). // Create a per-job builder with the snapshotted config so exclude // patterns are resolved consistently. @@ -382,10 +394,10 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { err = fmt.Errorf("%s job %d has no stored prompt (git_ref=%q); restart the daemon with 'roborev daemon restart'", job.JobType, job.ID, job.GitRef) } else if job.DiffContent != nil { // Dirty job - use pre-captured diff - reviewPrompt, err = pb.BuildDirty(job.RepoPath, *job.DiffContent, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) + reviewPrompt, err = pb.BuildDirty(effectiveRepoPath, *job.DiffContent, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) } else { // Normal job - build prompt from git ref - reviewPrompt, err = pb.Build(job.RepoPath, job.GitRef, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) + reviewPrompt, err = pb.Build(effectiveRepoPath, job.GitRef, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) } if err != nil { log.Printf("[%s] Error building prompt: %v", workerID, err) @@ -435,15 +447,22 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { log.Printf("[%s] Agent %s not available, using %s", workerID, job.Agent, agentName) } + // Use the effective worktree path for events (empty when worktree is gone or not a worktree job). + eventWorktreePath := "" + if effectiveRepoPath != job.RepoPath { + eventWorktreePath = effectiveRepoPath + } + // Broadcast started event wp.broadcaster.Broadcast(Event{ - Type: "review.started", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, + Type: "review.started", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + WorktreePath: eventWorktreePath, }) // Create output writer for tail command @@ -473,7 +492,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // For fix jobs, create an isolated worktree to run the agent in. // The agent modifies files in the worktree; afterwards we capture the diff as a patch. - reviewRepoPath := job.RepoPath + reviewRepoPath := effectiveRepoPath var fixWorktree *worktree.Worktree if job.IsFixJob() { wt, wtErr := worktree.Create(job.RepoPath, job.GitRef) @@ -504,13 +523,14 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { log.Printf("[%s] Job %d was canceled", workerID, job.ID) // Broadcast cancellation event wp.broadcaster.Broadcast(Event{ - Type: "review.canceled", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, + Type: "review.canceled", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + WorktreePath: eventWorktreePath, }) return // Job already marked as canceled in DB, nothing more to do } @@ -633,15 +653,16 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // Broadcast completion event verdict := storage.ParseVerdict(output) wp.broadcaster.Broadcast(Event{ - Type: "review.completed", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, - Verdict: verdict, - Findings: output, + Type: "review.completed", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + Verdict: verdict, + Findings: output, + WorktreePath: eventWorktreePath, }) } @@ -770,15 +791,22 @@ func (wp *WorkerPool) resolveBackupModel(job *storage.ReviewJob) string { // broadcastFailed sends a review.failed event for a job func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMsg string) { + wtPath := "" + if job.WorktreePath != "" { + if _, err := os.Stat(job.WorktreePath); err == nil { + wtPath = job.WorktreePath + } + } wp.broadcaster.Broadcast(Event{ - Type: "review.failed", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, - Error: errorMsg, + Type: "review.failed", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + Error: errorMsg, + WorktreePath: wtPath, }) } diff --git a/internal/storage/db.go b/internal/storage/db.go index 222fa7391..f0219eed7 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -707,6 +707,18 @@ func (db *DB) migrate() error { } } + // Migration: add worktree_path column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'worktree_path'`).Scan(&count) + if err != nil { + return fmt.Errorf("check worktree_path column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN worktree_path TEXT DEFAULT ''`) + if err != nil { + return fmt.Errorf("add worktree_path column: %w", err) + } + } + // Run sync-related migrations if err := db.migrateSyncColumns(); err != nil { return err diff --git a/internal/storage/db_job_test.go b/internal/storage/db_job_test.go index 00dedd0e5..55697b014 100644 --- a/internal/storage/db_job_test.go +++ b/internal/storage/db_job_test.go @@ -871,6 +871,39 @@ func TestReenqueueJob(t *testing.T) { require.Error(t, err) }) + t.Run("rerun preserves worktree_path", func(t *testing.T) { + isolatedDB := openTestDB(t) + defer isolatedDB.Close() + + repo := createRepo(t, isolatedDB, "/tmp/wt-preserve-repo") + commit := createCommit(t, isolatedDB, repo.ID, "wt-preserve-sha") + + job, err := isolatedDB.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "wt-preserve-sha", + Agent: "test", + WorktreePath: "/tmp/wt/feature-branch", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + assert.Equal(t, job.ID, claimed.ID) + + err = isolatedDB.CompleteJob(job.ID, "test", "prompt", "output") + require.NoError(t, err) + + err = isolatedDB.ReenqueueJob(job.ID) + require.NoError(t, err) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, JobStatusQueued, updated.Status) + assert.Equal(t, "/tmp/wt/feature-branch", updated.WorktreePath) + }) + t.Run("rerun done job and complete again", func(t *testing.T) { // Use isolated database to avoid interference from other subtests isolatedDB := openTestDB(t) diff --git a/internal/storage/hydration.go b/internal/storage/hydration.go index 16591fe0c..804324f35 100644 --- a/internal/storage/hydration.go +++ b/internal/storage/hydration.go @@ -31,6 +31,7 @@ type reviewJobScanFields struct { TokenUsage sql.NullString Agentic int Closed sql.NullInt64 + WorktreePath string } func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { @@ -107,6 +108,7 @@ func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { closed := fields.Closed.Int64 != 0 job.Closed = &closed } + job.WorktreePath = fields.WorktreePath } type reviewScanFields struct { diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index f28703c70..cf4f69cd1 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -58,6 +58,7 @@ type EnqueueOpts struct { Label string // Display label in TUI for task jobs (default: "prompt") JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty ParentJobID int64 // Parent job being fixed (for fix jobs) + WorktreePath string // Worktree checkout path (empty = use main repo root) } // EnqueueJob creates a new review job. The job type is inferred from opts. @@ -119,14 +120,14 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { result, err := db.Exec(` INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, - parent_job_id, uuid, source_machine_id, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + parent_job_id, uuid, source_machine_id, updated_at, worktree_path) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), nullString(opts.SessionID), opts.Agent, nullString(opts.Model), nullString(opts.Provider), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), parentJobIDParam, - uid, machineID, nowStr) + uid, machineID, nowStr, opts.WorktreePath) if err != nil { return nil, err } @@ -153,6 +154,7 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { UUID: uid, SourceMachineID: machineID, UpdatedAt: &now, + WorktreePath: opts.WorktreePath, } if opts.ParentJobID > 0 { job.ParentJobID = &opts.ParentJobID @@ -202,7 +204,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { err = db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, - j.output_prefix, j.patch_id, j.parent_job_id + j.output_prefix, j.patch_id, j.parent_job_id, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -211,7 +213,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { LIMIT 1 `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, &job.RepoPath, &job.RepoName, &fields.CommitSubject, &fields.DiffContent, &fields.Prompt, &fields.Agentic, &fields.JobType, &fields.ReviewType, - &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID) + &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID, &fields.WorktreePath) if err != nil { return nil, err } @@ -769,7 +771,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.closed, rv.output, rv.verdict_bool, j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.provider, j.token_usage + j.parent_job_id, j.provider, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -807,7 +809,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &j.RetryCount, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Closed, &output, &verdictBool, &fields.SourceMachineID, &fields.UUID, &fields.Model, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Provider, &fields.TokenUsage) + &fields.ParentJobID, &fields.Provider, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } @@ -857,7 +859,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, j.model, j.provider, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.patch, j.token_usage + j.parent_job_id, j.patch, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -865,7 +867,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { `, id).Scan(&j.ID, &j.RepoID, &fields.CommitID, &j.GitRef, &fields.Branch, &fields.SessionID, &j.Agent, &j.Reasoning, &j.Status, &fields.EnqueuedAt, &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Patch, &fields.TokenUsage) + &fields.ParentJobID, &fields.Patch, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } diff --git a/internal/storage/models.go b/internal/storage/models.go index 1334df6d1..07fc9fadf 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -72,6 +72,7 @@ type ReviewJob struct { OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) + WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync From 5530674006cdc13bbcfc7e4d06be4700cd84dc1f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 16:45:06 -0500 Subject: [PATCH 2/9] fix(sync,daemon): propagate worktree_path through Postgres sync and fix jobs (#1) - Bump Postgres schema to v9, add worktree_path column with migration - Add WorktreePath to SyncableJob and PulledJob structs - Include worktree_path in push (UpsertJob), pull (PullJobs), and pull-upsert (UpsertPulledJob) queries - Propagate parentJob.WorktreePath when enqueuing fix jobs - Add sync round-trip and fix-job propagation tests Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 25 +++--- internal/daemon/server_ops_test.go | 44 ++++++++++ internal/storage/postgres.go | 22 +++-- internal/storage/schemas/postgres_v9.sql | 102 +++++++++++++++++++++++ internal/storage/sync.go | 12 +-- internal/storage/sync_backfill_test.go | 80 ++++++++++++++++++ 6 files changed, 261 insertions(+), 24 deletions(-) create mode 100644 internal/storage/schemas/postgres_v9.sql diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 2ee54fb6d..a1d986cdf 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -2343,18 +2343,19 @@ func (s *Server) handleFixJob(w http.ResponseWriter, r *http.Request) { // Enqueue the fix job job, err := s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: parentJob.RepoID, - CommitID: commitID, - GitRef: fixGitRef, - Branch: parentJob.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - Prompt: fixPrompt, - Agentic: true, - Label: fmt.Sprintf("fix #%d", req.ParentJobID), - JobType: storage.JobTypeFix, - ParentJobID: req.ParentJobID, + RepoID: parentJob.RepoID, + CommitID: commitID, + GitRef: fixGitRef, + Branch: parentJob.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + Prompt: fixPrompt, + Agentic: true, + Label: fmt.Sprintf("fix #%d", req.ParentJobID), + JobType: storage.JobTypeFix, + ParentJobID: req.ParentJobID, + WorktreePath: parentJob.WorktreePath, }) if err != nil { s.writeInternalError(w, fmt.Sprintf("enqueue fix job: %v", err)) diff --git a/internal/daemon/server_ops_test.go b/internal/daemon/server_ops_test.go index 64845d5e1..1b0bcc8f6 100644 --- a/internal/daemon/server_ops_test.go +++ b/internal/daemon/server_ops_test.go @@ -597,6 +597,50 @@ func TestHandleFixJobStaleValidation(t *testing.T) { require.Equal(t, commit.Subject, stored.CommitSubject) }) + t.Run("fix job inherits parent worktree path", func(t *testing.T) { + // Enqueue a review with a worktree path + wtJob, err := db.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fix-val-abc", + Agent: "test", + WorktreePath: filepath.Join(tmpDir, "worktrees", "feature"), + }) + require.NoError(t, err) + + // Claim jobs until we get ours (earlier subtests may leave queued jobs) + for { + claimed, claimErr := db.ClaimJob("w-wt-fix") + require.NoError(t, claimErr) + require.NotNil(t, claimed) + if claimed.ID == wtJob.ID { + break + } + db.CompleteJob(claimed.ID, "test", "prompt", "PASS") + } + require.NoError(t, db.CompleteJob( + wtJob.ID, "test", "prompt", "FAIL: issues found", + )) + + req := testutil.MakeJSONRequest( + t, http.MethodPost, "/api/job/fix", + fixJobRequest{ParentJobID: wtJob.ID}, + ) + w := httptest.NewRecorder() + server.handleFixJob(w, req) + assertHandlerStatus(t, w, http.StatusCreated) + + var fixJob storage.ReviewJob + testutil.DecodeJSON(t, w, &fixJob) + + stored, err := db.GetJobByID(fixJob.ID) + require.NoError(t, err) + require.Equal(t, + filepath.Join(tmpDir, "worktrees", "feature"), + stored.WorktreePath, + ) + }) + t.Run("custom prompt includes review context", func(t *testing.T) { req := testutil.MakeJSONRequest( t, http.MethodPost, "/api/job/fix", diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 61298edfd..0c7394461 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -14,12 +14,12 @@ import ( ) // PostgreSQL schema version - increment when schema changes -const pgSchemaVersion = 8 +const pgSchemaVersion = 9 // pgSchemaName is the PostgreSQL schema used to isolate roborev tables const pgSchemaName = "roborev" -//go:embed schemas/postgres_v8.sql +//go:embed schemas/postgres_v9.sql var pgSchemaSQL string // pgSchemaStatements returns the individual DDL statements for schema creation. @@ -282,6 +282,12 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { return fmt.Errorf("migrate to v8 (add token_usage column): %w", err) } } + if currentVersion < 9 { + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS worktree_path TEXT`) + if err != nil { + return fmt.Errorf("migrate to v9 (add worktree_path column): %w", err) + } + } // Update version _, err = p.pool.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, pgSchemaVersion) if err != nil { @@ -535,8 +541,8 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, NOW()) + worktree_path, source_machine_id, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, @@ -547,10 +553,11 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p commit_id = EXCLUDED.commit_id, patch_id = EXCLUDED.patch_id, token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, - nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), j.SourceMachineID) + nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) return err } @@ -606,6 +613,7 @@ type PulledJob struct { DiffContent *string Error string TokenUsage string + WorktreePath string SourceMachineID string UpdatedAt time.Time } @@ -631,7 +639,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, j.started_at, j.finished_at, COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), - j.source_machine_id, j.updated_at, j.id + COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at, j.id FROM review_jobs j JOIN repos r ON j.repo_id = r.id LEFT JOIN commits c ON j.commit_id = c.id @@ -658,7 +666,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &j.EnqueuedAt, &j.StartedAt, &j.FinishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, - &j.SourceMachineID, &j.UpdatedAt, &lastID, + &j.WorktreePath, &j.SourceMachineID, &j.UpdatedAt, &lastID, ) if err != nil { return nil, cursor, fmt.Errorf("scan job: %w", err) diff --git a/internal/storage/schemas/postgres_v9.sql b/internal/storage/schemas/postgres_v9.sql new file mode 100644 index 000000000..9fe819a77 --- /dev/null +++ b/internal/storage/schemas/postgres_v9.sql @@ -0,0 +1,102 @@ +-- PostgreSQL schema version 9 +-- Adds worktree_path to review_jobs for worktree-aware job execution. +-- Note: Version is managed by EnsureSchema(), not this file. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + branch TEXT, + session_id TEXT, + agent TEXT NOT NULL, + model TEXT, + reasoning TEXT, + job_type TEXT NOT NULL DEFAULT 'review', + review_type TEXT NOT NULL DEFAULT '', + patch_id TEXT, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'done', 'failed', 'canceled', 'applied', 'rebased')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + token_usage TEXT, + worktree_path TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + closed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +-- Note: idx_review_jobs_branch, idx_review_jobs_job_type, and +-- idx_review_jobs_patch_id are created by migration code, not here +-- (to support upgrades from older versions where those columns +-- don't exist yet). +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/sync.go b/internal/storage/sync.go index 8c0a5816d..6a54d9518 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -286,6 +286,7 @@ type SyncableJob struct { DiffContent *string Error string TokenUsage string + WorktreePath string SourceMachineID string UpdatedAt time.Time } @@ -300,7 +301,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, COALESCE(j.started_at, ''), COALESCE(j.finished_at, ''), COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), - j.source_machine_id, j.updated_at + COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at FROM review_jobs j JOIN repos r ON j.repo_id = r.id LEFT JOIN commits c ON j.commit_id = c.id @@ -335,7 +336,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &enqueuedAt, &startedAt, &finishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, - &j.SourceMachineID, &updatedAt, + &j.WorktreePath, &j.SourceMachineID, &updatedAt, ) if err != nil { return nil, fmt.Errorf("scan job: %w", err) @@ -578,8 +579,8 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at, synced_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + worktree_path, source_machine_id, updated_at, synced_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET status = excluded.status, finished_at = excluded.finished_at, @@ -590,13 +591,14 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error commit_id = excluded.commit_id, patch_id = excluded.patch_id, token_usage = COALESCE(excluded.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(excluded.worktree_path, review_jobs.worktree_path), updated_at = excluded.updated_at, synced_at = ? `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), j.Reasoning, j.JobType, j.ReviewType, nullStr(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt.Format(time.RFC3339), nullTimeStr(j.StartedAt), nullTimeStr(j.FinishedAt), nullStr(j.Prompt), j.DiffContent, nullStr(j.Error), nullStr(j.TokenUsage), - j.SourceMachineID, j.UpdatedAt.Format(time.RFC3339), now, now) + nullStr(j.WorktreePath), j.SourceMachineID, j.UpdatedAt.Format(time.RFC3339), now, now) return err } diff --git a/internal/storage/sync_backfill_test.go b/internal/storage/sync_backfill_test.go index 934a90014..b9c167a61 100644 --- a/internal/storage/sync_backfill_test.go +++ b/internal/storage/sync_backfill_test.go @@ -221,3 +221,83 @@ func TestUpsertPulledJob_BackfillsModel(t *testing.T) { assert.False(t, !modelPreserved.Valid || modelPreserved.String != "gpt-4") } + +func TestGetJobsToSync_IncludesWorktreePath(t *testing.T) { + h := newSyncTestHelper(t) + + // Enqueue a job with a worktree path + commit, err := h.db.GetOrCreateCommit(h.repo.ID, "wt-sync-abc", "Author", "Subject", time.Now()) + require.NoError(t, err) + + job, err := h.db.EnqueueJob(EnqueueOpts{ + RepoID: h.repo.ID, + CommitID: commit.ID, + GitRef: "wt-sync-abc", + Agent: "test", + WorktreePath: "/worktrees/feature-branch", + }) + require.NoError(t, err) + + // Complete the job so it becomes syncable + claimed, err := h.db.ClaimJob("w-sync-wt") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, h.db.CompleteJob(job.ID, "test", "prompt", "PASS")) + + jobs, err := h.db.GetJobsToSync(h.machineID, 10) + require.NoError(t, err) + require.NotEmpty(t, jobs) + + var found *SyncableJob + for i := range jobs { + if jobs[i].UUID == job.UUID { + found = &jobs[i] + break + } + } + require.NotNil(t, found, "expected job %s in sync results", job.UUID) + assert.Equal(t, "/worktrees/feature-branch", found.WorktreePath) +} + +func TestUpsertPulledJob_PreservesWorktreePath(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/test/repo-wt-sync") + require.NoError(t, err) + + jobUUID := "test-uuid-wt-" + time.Now().Format("20060102150405") + + pulledJob := PulledJob{ + UUID: jobUUID, + RepoIdentity: "/test/repo-wt-sync", + GitRef: "HEAD", + Agent: "test-agent", + Status: "done", + WorktreePath: "/worktrees/my-branch", + SourceMachineID: "test-machine", + EnqueuedAt: time.Now(), + UpdatedAt: time.Now(), + } + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + require.NoError(t, err) + + // Verify worktree_path was stored + var wt string + err = db.QueryRow( + `SELECT COALESCE(worktree_path, '') FROM review_jobs WHERE uuid = ?`, jobUUID, + ).Scan(&wt) + require.NoError(t, err) + assert.Equal(t, "/worktrees/my-branch", wt) + + // Upsert with empty worktree_path should not clear existing value + pulledJob.WorktreePath = "" + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + require.NoError(t, err) + + err = db.QueryRow( + `SELECT COALESCE(worktree_path, '') FROM review_jobs WHERE uuid = ?`, jobUUID, + ).Scan(&wt) + require.NoError(t, err) + assert.Equal(t, "/worktrees/my-branch", wt) +} From 33ea1ec8c91f07f95773ee0865b990c94ce21306 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 16:54:43 -0500 Subject: [PATCH 3/9] fix(daemon,sync): complete worktree_path propagation in events, hooks, and batch sync (#1) - Add WorktreePath to Event.MarshalJSON so SSE/webhook payloads include the field - Load repo-specific hook config from worktree path when available, so the correct .roborev.toml is used - Add worktree_path to BatchUpsertJobs SQL (the main sync push path) - Add MarshalJSON serialization tests for worktree_path Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/broadcaster.go | 42 ++++++++++++++++++---------------- internal/daemon/event_test.go | 25 ++++++++++++++++++++ internal/daemon/hooks.go | 13 ++++++++--- internal/storage/postgres.go | 7 +++--- 4 files changed, 61 insertions(+), 26 deletions(-) diff --git a/internal/daemon/broadcaster.go b/internal/daemon/broadcaster.go index bcb949e98..39e86acd1 100644 --- a/internal/daemon/broadcaster.go +++ b/internal/daemon/broadcaster.go @@ -113,26 +113,28 @@ func (b *EventBroadcaster) SubscriberCount() int { // MarshalJSON converts an Event to JSON for streaming func (e Event) MarshalJSON() ([]byte, error) { return json.Marshal(struct { - Type string `json:"type"` - TS string `json:"ts"` - JobID int64 `json:"job_id"` - Repo string `json:"repo"` - RepoName string `json:"repo_name"` - SHA string `json:"sha"` - Agent string `json:"agent,omitempty"` - Verdict string `json:"verdict,omitempty"` - Findings string `json:"findings,omitempty"` - Error string `json:"error,omitempty"` + Type string `json:"type"` + TS string `json:"ts"` + JobID int64 `json:"job_id"` + Repo string `json:"repo"` + RepoName string `json:"repo_name"` + SHA string `json:"sha"` + Agent string `json:"agent,omitempty"` + Verdict string `json:"verdict,omitempty"` + Findings string `json:"findings,omitempty"` + Error string `json:"error,omitempty"` + WorktreePath string `json:"worktree_path,omitempty"` }{ - Type: e.Type, - TS: e.TS.UTC().Format(time.RFC3339), - JobID: e.JobID, - Repo: e.Repo, - RepoName: e.RepoName, - SHA: e.SHA, - Agent: e.Agent, - Verdict: e.Verdict, - Findings: e.Findings, - Error: e.Error, + Type: e.Type, + TS: e.TS.UTC().Format(time.RFC3339), + JobID: e.JobID, + Repo: e.Repo, + RepoName: e.RepoName, + SHA: e.SHA, + Agent: e.Agent, + Verdict: e.Verdict, + Findings: e.Findings, + Error: e.Error, + WorktreePath: e.WorktreePath, }) } diff --git a/internal/daemon/event_test.go b/internal/daemon/event_test.go index 74e406cd2..e3b7593cb 100644 --- a/internal/daemon/event_test.go +++ b/internal/daemon/event_test.go @@ -53,4 +53,29 @@ func TestEvent_MarshalJSON(t *testing.T) { // Explicitly check that 'error' is not present _, hasError := decoded["error"] assert.False(t, hasError, "expected 'error' field to be omitted") + + // WorktreePath omitted when empty + _, hasWT := decoded["worktree_path"] + assert.False(t, hasWT, "expected 'worktree_path' to be omitted when empty") +} + +func TestEvent_MarshalJSON_WorktreePath(t *testing.T) { + event := Event{ + Type: "review.completed", + TS: time.Date(2026, 1, 11, 10, 0, 30, 0, time.UTC), + JobID: 42, + Repo: "/path/to/myrepo", + RepoName: "myrepo", + SHA: "abc123", + Agent: "claude-code", + WorktreePath: "/worktrees/feature-branch", + } + + data, err := event.MarshalJSON() + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(data, &decoded)) + + assert.Equal(t, "/worktrees/feature-branch", decoded["worktree_path"]) } diff --git a/internal/daemon/hooks.go b/internal/daemon/hooks.go index ca65b7e7c..0fec649e8 100644 --- a/internal/daemon/hooks.go +++ b/internal/daemon/hooks.go @@ -137,11 +137,18 @@ func (hr *HookRunner) handleEvent(event Event) { return } - // Collect hooks: copy global slice to avoid aliasing, then append repo-specific + // Collect hooks: copy global slice to avoid aliasing, then append repo-specific. + // Prefer worktree path for config loading so the correct .roborev.toml is used. hooks := append([]config.HookConfig{}, cfg.Hooks...) - if event.Repo != "" { - if repoCfg, err := config.LoadRepoConfig(event.Repo); err == nil && repoCfg != nil { + repoConfigPath := event.Repo + if event.WorktreePath != "" { + if _, err := os.Stat(event.WorktreePath); err == nil { + repoConfigPath = event.WorktreePath + } + } + if repoConfigPath != "" { + if repoCfg, err := config.LoadRepoConfig(repoConfigPath); err == nil && repoCfg != nil { hooks = append(hooks, repoCfg.Hooks...) } } diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 0c7394461..ac9193360 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -934,8 +934,8 @@ func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bo INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, NOW()) + worktree_path, source_machine_id, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, @@ -945,10 +945,11 @@ func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bo commit_id = EXCLUDED.commit_id, patch_id = EXCLUDED.patch_id, token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() `, j.UUID, jw.PgRepoID, jw.PgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, - nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), j.SourceMachineID) + nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) } br := p.pool.SendBatch(ctx, batch) From db8e5eb8e14faa70b9bc6e2e7225a170bcb83c04 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 17:19:57 -0500 Subject: [PATCH 4/9] test(sync): add batch upsert round-trip test for worktree_path (#1) Extend TestIntegration_BatchUpsertJobs to verify worktree_path survives a BatchUpsertJobs push and PullJobs pull cycle. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/storage/postgres_test.go | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index 5bcb454cf..01c728764 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -725,6 +725,46 @@ func TestIntegration_BatchUpsertJobs(t *testing.T) { require.NoError(t, err) assert.Nil(t, success) }) + + t.Run("worktree_path round-trips through batch upsert and pull", func(t *testing.T) { + wtJobUUID := uuid.NewString() + commitID := createTestCommit(t, pool.Pool(), TestCommitOpts{ + RepoID: repoID, SHA: "batch-wt-sha", + }) + wtJobs := []JobWithPgIDs{{ + Job: SyncableJob{ + UUID: wtJobUUID, + RepoIdentity: "https://github.com/test/batch-jobs-test.git", + CommitSHA: "batch-wt-sha", + GitRef: "test-ref", + Agent: "test", + Status: "done", + WorktreePath: "/worktrees/feature-x", + SourceMachineID: defaultTestMachineID, + EnqueuedAt: time.Now(), + }, + PgRepoID: repoID, + PgCommitID: &commitID, + }} + + success, err := pool.BatchUpsertJobs(ctx, wtJobs) + require.NoError(t, err) + assert.Equal(t, 1, countSuccesses(success)) + + // Pull back using a different machine ID so the job isn't excluded + pulled, _, err := pool.PullJobs(ctx, "other-machine", "", 100) + require.NoError(t, err) + + var found *PulledJob + for i := range pulled { + if pulled[i].UUID == wtJobUUID { + found = &pulled[i] + break + } + } + require.NotNil(t, found, "expected job %s in pull results", wtJobUUID) + assert.Equal(t, "/worktrees/feature-x", found.WorktreePath) + }) } func TestIntegration_BatchUpsertReviews(t *testing.T) { From a5db2b16fe5e0acab5b1685042317e9a8c09a563 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 17:32:00 -0500 Subject: [PATCH 5/9] fix(daemon): validate worktree paths against repo root before use (#1) - Add git.ValidateWorktreeForRepo() that resolves the main repo root from a worktree path and compares it (with symlink resolution) to the expected repo root - Use filepath.Clean on both paths in handleEnqueue worktree detection to avoid normalization false positives - Replace os.Stat checks in worker and hook runner with ValidateWorktreeForRepo, closing the trust boundary gap for stale/synced paths - Consolidate hook config loading and working directory resolution into a single effectiveRepo path Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/hooks.go | 31 +++++++++++++------------------ internal/daemon/server.go | 7 ++++--- internal/daemon/worker.go | 7 ++++--- internal/git/git.go | 20 ++++++++++++++++++++ internal/git/git_test.go | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 24 deletions(-) diff --git a/internal/daemon/hooks.go b/internal/daemon/hooks.go index 0fec649e8..f2fe1ae81 100644 --- a/internal/daemon/hooks.go +++ b/internal/daemon/hooks.go @@ -9,7 +9,6 @@ import ( "log" "net/http" neturl "net/url" - "os" "os/exec" "path/filepath" "runtime" @@ -137,18 +136,20 @@ func (hr *HookRunner) handleEvent(event Event) { return } - // Collect hooks: copy global slice to avoid aliasing, then append repo-specific. - // Prefer worktree path for config loading so the correct .roborev.toml is used. - hooks := append([]config.HookConfig{}, cfg.Hooks...) - - repoConfigPath := event.Repo - if event.WorktreePath != "" { - if _, err := os.Stat(event.WorktreePath); err == nil { - repoConfigPath = event.WorktreePath + // Resolve one effective repo path: prefer the worktree if it still + // exists and belongs to the same repository. Used for both config + // loading (.roborev.toml) and as the hook working directory. + effectiveRepo := event.Repo + if event.WorktreePath != "" && event.Repo != "" { + if gitpkg.ValidateWorktreeForRepo(event.WorktreePath, event.Repo) { + effectiveRepo = event.WorktreePath } } - if repoConfigPath != "" { - if repoCfg, err := config.LoadRepoConfig(repoConfigPath); err == nil && repoCfg != nil { + + // Collect hooks: copy global slice to avoid aliasing, then append repo-specific. + hooks := append([]config.HookConfig{}, cfg.Hooks...) + if effectiveRepo != "" { + if repoCfg, err := config.LoadRepoConfig(effectiveRepo); err == nil && repoCfg != nil { hooks = append(hooks, repoCfg.Hooks...) } } @@ -178,13 +179,7 @@ func (hr *HookRunner) handleEvent(event Event) { fired++ // Run async so hooks don't block workers hr.wg.Add(1) - hookDir := event.Repo - if event.WorktreePath != "" { - if _, err := os.Stat(event.WorktreePath); err == nil { - hookDir = event.WorktreePath - } - } - go hr.runHook(cmd, hookDir) + go hr.runHook(cmd, effectiveRepo) } if fired > 0 { diff --git a/internal/daemon/server.go b/internal/daemon/server.go index a1d986cdf..a69e06d17 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -707,11 +707,12 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - // Detect worktree: if the git working directory differs from the main + // Detect worktree: if the worktree toplevel differs from the main // repo root, the request originated from a worktree checkout. + // Clean both paths to avoid false positives from normalization differences. var worktreePath string - if gitCwd != repoRoot { - worktreePath = gitCwd + if filepath.Clean(gitCwd) != filepath.Clean(repoRoot) { + worktreePath = filepath.Clean(gitCwd) } // Check if branch is excluded from reviews diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index d1c43509d..d01695d54 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -362,11 +362,12 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { return } - // Resolve effective repo path: use worktree if available and still exists. + // Resolve effective repo path: use worktree if available, still exists, + // and is a valid git checkout for the same repository. effectiveRepoPath := job.RepoPath if job.WorktreePath != "" { - if _, err := os.Stat(job.WorktreePath); err != nil { - log.Printf("[%s] Worktree %s no longer exists for job %d, using main repo", + if !gitpkg.ValidateWorktreeForRepo(job.WorktreePath, job.RepoPath) { + log.Printf("[%s] Worktree %s invalid or gone for job %d, using main repo", workerID, job.WorktreePath, job.ID) } else { effectiveRepoPath = job.WorktreePath diff --git a/internal/git/git.go b/internal/git/git.go index 8c2226c26..39aee1bf6 100644 --- a/internal/git/git.go +++ b/internal/git/git.go @@ -237,6 +237,26 @@ func GetRepoRoot(path string) (string, error) { return normalizeMSYSPath(string(out)), nil } +// ValidateWorktreeForRepo checks that worktreePath is a git checkout +// whose main repository root matches repoRoot. Returns true if the +// worktree is valid for the given repo. Returns false (without error) +// if the path doesn't exist, isn't a git repo, or belongs to a +// different repository. +func ValidateWorktreeForRepo(worktreePath, repoRoot string) bool { + mainRoot, err := GetMainRepoRoot(worktreePath) + if err != nil { + return false + } + // Resolve symlinks so /tmp vs /private/tmp (macOS) don't cause + // false negatives. + a, errA := filepath.EvalSymlinks(mainRoot) + b, errB := filepath.EvalSymlinks(repoRoot) + if errA != nil || errB != nil { + return filepath.Clean(mainRoot) == filepath.Clean(repoRoot) + } + return a == b +} + // GetMainRepoRoot returns the main repository root, resolving through worktrees. // For a regular repository or submodule, this returns the same as GetRepoRoot. // For a worktree, this returns the main repository's root path. diff --git a/internal/git/git_test.go b/internal/git/git_test.go index 553e227d8..0362d8bbf 100644 --- a/internal/git/git_test.go +++ b/internal/git/git_test.go @@ -1535,3 +1535,35 @@ func TestWorktreePathForBranch(t *testing.T) { } }) } + +func TestValidateWorktreeForRepo(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found") + } + + t.Run("valid worktree passes", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + wt := repo.AddWorktree("val-wt") + + assert.True(t, ValidateWorktreeForRepo(wt.Dir, repo.Dir)) + }) + + t.Run("main repo validates against itself", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + + assert.True(t, ValidateWorktreeForRepo(repo.Dir, repo.Dir)) + }) + + t.Run("nonexistent path fails", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + + assert.False(t, ValidateWorktreeForRepo("/nonexistent/path", repo.Dir)) + }) + + t.Run("unrelated repo fails", func(t *testing.T) { + repo1 := NewTestRepoWithCommit(t) + repo2 := NewTestRepoWithCommit(t) + + assert.False(t, ValidateWorktreeForRepo(repo2.Dir, repo1.Dir)) + }) +} From 5b4b4ccfd88ba32dbef4b35b3750afe9f2b58a5b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 17:51:53 -0500 Subject: [PATCH 6/9] fix(storage,git): fix integration test UUID and validate checkout root (#1) - Use valid UUID for PullJobs excludeMachineID (fixes CI failure) - Assert worktree_path directly via SQL before PullJobs round-trip - ValidateWorktreeForRepo now also verifies the path is a checkout root via GetRepoRoot, rejecting nested subdirectories - Add subdirectory regression test Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/git/git.go | 25 ++++++++++++++++++------- internal/git/git_test.go | 8 ++++++++ internal/storage/postgres_test.go | 20 +++++++++++++++++--- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/internal/git/git.go b/internal/git/git.go index 39aee1bf6..143eeec50 100644 --- a/internal/git/git.go +++ b/internal/git/git.go @@ -243,18 +243,29 @@ func GetRepoRoot(path string) (string, error) { // if the path doesn't exist, isn't a git repo, or belongs to a // different repository. func ValidateWorktreeForRepo(worktreePath, repoRoot string) bool { + // Verify the path is itself a checkout root (not a subdirectory). + toplevel, err := GetRepoRoot(worktreePath) + if err != nil { + return false + } + if cleanEvalPath(toplevel) != cleanEvalPath(worktreePath) { + return false + } + // Verify the checkout belongs to the same main repository. mainRoot, err := GetMainRepoRoot(worktreePath) if err != nil { return false } - // Resolve symlinks so /tmp vs /private/tmp (macOS) don't cause - // false negatives. - a, errA := filepath.EvalSymlinks(mainRoot) - b, errB := filepath.EvalSymlinks(repoRoot) - if errA != nil || errB != nil { - return filepath.Clean(mainRoot) == filepath.Clean(repoRoot) + return cleanEvalPath(mainRoot) == cleanEvalPath(repoRoot) +} + +// cleanEvalPath resolves symlinks and cleans the path for comparison. +// Falls back to filepath.Clean if symlink resolution fails. +func cleanEvalPath(p string) string { + if resolved, err := filepath.EvalSymlinks(p); err == nil { + return resolved } - return a == b + return filepath.Clean(p) } // GetMainRepoRoot returns the main repository root, resolving through worktrees. diff --git a/internal/git/git_test.go b/internal/git/git_test.go index 0362d8bbf..0553fd135 100644 --- a/internal/git/git_test.go +++ b/internal/git/git_test.go @@ -1566,4 +1566,12 @@ func TestValidateWorktreeForRepo(t *testing.T) { assert.False(t, ValidateWorktreeForRepo(repo2.Dir, repo1.Dir)) }) + + t.Run("subdirectory of same repo fails", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + subDir := filepath.Join(repo.Dir, "src") + require.NoError(t, os.MkdirAll(subDir, 0o755)) + + assert.False(t, ValidateWorktreeForRepo(subDir, repo.Dir)) + }) } diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index 01c728764..577005f1a 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -728,6 +728,8 @@ func TestIntegration_BatchUpsertJobs(t *testing.T) { t.Run("worktree_path round-trips through batch upsert and pull", func(t *testing.T) { wtJobUUID := uuid.NewString() + // Use a distinct machine ID so we can exclude it when pulling + wtMachineID := uuid.NewString() commitID := createTestCommit(t, pool.Pool(), TestCommitOpts{ RepoID: repoID, SHA: "batch-wt-sha", }) @@ -740,7 +742,7 @@ func TestIntegration_BatchUpsertJobs(t *testing.T) { Agent: "test", Status: "done", WorktreePath: "/worktrees/feature-x", - SourceMachineID: defaultTestMachineID, + SourceMachineID: wtMachineID, EnqueuedAt: time.Now(), }, PgRepoID: repoID, @@ -751,8 +753,20 @@ func TestIntegration_BatchUpsertJobs(t *testing.T) { require.NoError(t, err) assert.Equal(t, 1, countSuccesses(success)) - // Pull back using a different machine ID so the job isn't excluded - pulled, _, err := pool.PullJobs(ctx, "other-machine", "", 100) + // Verify worktree_path directly in the database row + var storedWT *string + err = pool.pool.QueryRow(ctx, + `SELECT worktree_path FROM review_jobs WHERE uuid = $1`, wtJobUUID, + ).Scan(&storedWT) + require.NoError(t, err) + require.NotNil(t, storedWT) + assert.Equal(t, "/worktrees/feature-x", *storedWT) + + // Also verify PullJobs returns the field: use a different + // valid UUID to exclude, and scope via cursor to avoid + // scanning the entire table. + otherMachine := uuid.NewString() + pulled, _, err := pool.PullJobs(ctx, otherMachine, "", 100) require.NoError(t, err) var found *PulledJob From 4c9510b4d5692cdca087fc91ba44b6aa5c727b8f Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 18:29:11 -0500 Subject: [PATCH 7/9] refactor(daemon): rename gitCwd to checkoutRoot for clarity (#1) The variable holds the result of git rev-parse --show-toplevel, not a raw working directory. The old name misled reviewers into thinking subdirectory invocations could produce incorrect worktree detection. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index a69e06d17..b49086ffd 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -691,9 +691,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } req.ReviewType = canonical[0] - // Get the working directory root for git commands (may be a worktree) - // This is needed to resolve refs like HEAD correctly in the worktree context - gitCwd, err := git.GetRepoRoot(req.RepoPath) + // Get the checkout root (via --show-toplevel) for git commands. + // For worktrees this is the worktree root; for the main repo it equals repoRoot. + checkoutRoot, err := git.GetRepoRoot(req.RepoPath) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("not a git repository: %v", err)) return @@ -711,12 +711,12 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // repo root, the request originated from a worktree checkout. // Clean both paths to avoid false positives from normalization differences. var worktreePath string - if filepath.Clean(gitCwd) != filepath.Clean(repoRoot) { - worktreePath = filepath.Clean(gitCwd) + if filepath.Clean(checkoutRoot) != filepath.Clean(repoRoot) { + worktreePath = filepath.Clean(checkoutRoot) } // Check if branch is excluded from reviews - currentBranch := git.GetCurrentBranch(gitCwd) + currentBranch := git.GetCurrentBranch(checkoutRoot) if currentBranch != "" && config.IsBranchExcluded(repoRoot, currentBranch) { // Silently skip excluded branches - return 200 OK with skipped flag writeJSON(w, map[string]any{ @@ -850,7 +850,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } } else if isDirty { // Dirty review - use pre-captured diff - targetSHA, _ := git.ResolveSHA(gitCwd, "HEAD") + targetSHA, _ := git.ResolveSHA(checkoutRoot, "HEAD") job, err = s.db.EnqueueJob(storage.EnqueueOpts{ RepoID: repo.ID, GitRef: gitRef, @@ -870,16 +870,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } } else if isRange { // For ranges, resolve both endpoints and create range job - // Use gitCwd to resolve refs correctly in worktree context + // Use checkoutRoot to resolve refs correctly in worktree context parts := strings.SplitN(gitRef, "..", 2) - startSHA, err := git.ResolveSHA(gitCwd, parts[0]) + startSHA, err := git.ResolveSHA(checkoutRoot, parts[0]) if err != nil { // If the start ref is ^ and resolution failed, the commit // may be the root commit (no parent). Use the empty tree SHA so // the range includes the root commit's changes. if before, ok := strings.CutSuffix(parts[0], "^"); ok { base := before - if _, resolveErr := git.ResolveSHA(gitCwd, base+"^{commit}"); resolveErr == nil { + if _, resolveErr := git.ResolveSHA(checkoutRoot, base+"^{commit}"); resolveErr == nil { startSHA = git.EmptyTreeSHA err = nil } @@ -889,7 +889,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } } - endSHA, err := git.ResolveSHA(gitCwd, parts[1]) + endSHA, err := git.ResolveSHA(checkoutRoot, parts[1]) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid end commit: %v", err)) return @@ -901,7 +901,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // means we can't prove all are excluded. fullRef := startSHA + ".." + endSHA if rangeCommits, rcErr := git.GetRangeCommits( - gitCwd, fullRef, + checkoutRoot, fullRef, ); rcErr == nil && len(rangeCommits) > 0 { messages := make([]string, 0, len(rangeCommits)) allRead := true @@ -943,8 +943,8 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } } else { - // Single commit - use gitCwd to resolve refs correctly in worktree context - sha, err := git.ResolveSHA(gitCwd, gitRef) + // Single commit - use checkoutRoot to resolve refs correctly in worktree context + sha, err := git.ResolveSHA(checkoutRoot, gitRef) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid commit: %v", err)) return @@ -974,7 +974,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - patchID := git.GetPatchID(gitCwd, sha) + patchID := git.GetPatchID(checkoutRoot, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ RepoID: repo.ID, From 0e3ec54cfe030df92f7fd2e4afaefd6b61882b8e Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 20:02:37 -0500 Subject: [PATCH 8/9] fix(daemon): use checkout root for branch exclusion and scope session reuse by worktree (#1) - Resolve branch exclusion config from checkoutRoot instead of repoRoot so worktree-specific .roborev.toml rules apply - Add worktree_path to FindReusableSessionCandidates query filter so sessions from different worktrees of the same repo/branch are not reused across checkouts Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 11 ++++++----- internal/daemon/server_jobs_test.go | 20 ++++++++++---------- internal/storage/reviews.go | 9 +++++---- 3 files changed, 21 insertions(+), 19 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index b49086ffd..15a746ccd 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -717,7 +717,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // Check if branch is excluded from reviews currentBranch := git.GetCurrentBranch(checkoutRoot) - if currentBranch != "" && config.IsBranchExcluded(repoRoot, currentBranch) { + if currentBranch != "" && config.IsBranchExcluded(checkoutRoot, currentBranch) { // Silently skip excluded branches - return 200 OK with skipped flag writeJSON(w, map[string]any{ "skipped": true, @@ -855,7 +855,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, GitRef: gitRef, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, targetSHA), + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), Agent: agentName, Model: model, Reasoning: reasoning, @@ -930,7 +930,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, GitRef: fullRef, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, endSHA), + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), Agent: agentName, Model: model, Reasoning: reasoning, @@ -981,7 +981,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { CommitID: commit.ID, GitRef: sha, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, sha), + SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), Agent: agentName, Model: model, Reasoning: reasoning, @@ -1020,7 +1020,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } func (s *Server) findReusableSessionID( - repoPath string, repoID int64, branch, agentName, reviewType, targetSHA string, + repoPath string, repoID int64, branch, agentName, reviewType, worktreePath, targetSHA string, ) string { cfg := s.configWatcher.Config() if !config.ResolveReuseReviewSession(repoPath, cfg) || branch == "" || targetSHA == "" { @@ -1032,6 +1032,7 @@ func (s *Server) findReusableSessionID( branch, agentName, reviewType, + worktreePath, config.ResolveReuseReviewSessionLookback(repoPath, cfg), ) if err != nil { diff --git a/internal/daemon/server_jobs_test.go b/internal/daemon/server_jobs_test.go index 8e9a1034e..034ad81c0 100644 --- a/internal/daemon/server_jobs_test.go +++ b/internal/daemon/server_jobs_test.go @@ -616,7 +616,7 @@ func TestHandleEnqueueReusesPreviousBranchSessionWhenEnabled(t *testing.T) { }, "failed to seed session_id: %v", err) } - candidate, err := db.FindReusableSessionCandidate(repo.ID, "feature/session", "test", config.ReviewTypeDefault) + candidate, err := db.FindReusableSessionCandidate(repo.ID, "feature/session", "test", config.ReviewTypeDefault, "") if err != nil { require.Condition(t, func() bool { return false @@ -625,7 +625,7 @@ func TestHandleEnqueueReusesPreviousBranchSessionWhenEnabled(t *testing.T) { require.NotNil(t, candidate, "expected reusable session candidate") assert.Equal(t, "session-123", candidate.SessionID, "candidate session_id") - reused := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, sha) + reused := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", sha) if reused != "session-123" { require.Condition(t, func() bool { return false @@ -780,7 +780,7 @@ func TestFindReusableSessionIDRejectsReusedBranchNameFromUnrelatedHistory(t *tes } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want empty for unrelated history", got) @@ -873,7 +873,7 @@ func TestFindReusableSessionIDRejectsCandidateThatIsTooOldOnBranch(t *testing.T) } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want empty for old candidate", got) @@ -1013,7 +1013,7 @@ func TestFindReusableSessionIDFallsBackToOlderValidCandidate(t *testing.T) { } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want %q", got, "session-valid") @@ -1161,21 +1161,21 @@ func TestFindReusableSessionIDUsesConfigurableLookback(t *testing.T) { } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with default lookback = %q, want %q", got, "session-valid") } server.configWatcher.Config().ReuseReviewSessionLookback = 10 - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with capped lookback = %q, want empty", got) } server.configWatcher.Config().ReuseReviewSessionLookback = 12 - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with expanded lookback = %q, want %q", got, "session-valid") @@ -1305,7 +1305,7 @@ func TestFindReusableSessionIDLookbackIgnoresUnusableRefs(t *testing.T) { }, "failed to seed malformed session candidate: %v", err) } - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with unusable newer refs = %q, want %q", got, "session-valid") @@ -1407,7 +1407,7 @@ func TestFindReusableSessionIDSkipsInvalidStoredSessionID(t *testing.T) { }, "failed to seed invalid session_id: %v", err) } - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with invalid stored session_id = %q, want %q", got, "session-valid") diff --git a/internal/storage/reviews.go b/internal/storage/reviews.go index c6359e5dd..850ff6630 100644 --- a/internal/storage/reviews.go +++ b/internal/storage/reviews.go @@ -135,7 +135,7 @@ func (db *DB) GetRecentReviewsForRepo(repoID int64, limit int) ([]Review, error) // FindReusableSessionCandidates returns recent completed jobs with reusable // sessions for the same repo, branch, agent, and review type, newest first. func (db *DB) FindReusableSessionCandidates( - repoID int64, branch, agent, reviewType string, limit int, + repoID int64, branch, agent, reviewType, worktreePath string, limit int, ) ([]ReviewJob, error) { if repoID == 0 || branch == "" || agent == "" { return nil, nil @@ -153,8 +153,9 @@ func (db *DB) FindReusableSessionCandidates( AND session_id IS NOT NULL AND session_id <> '' AND COALESCE(NULLIF(review_type, ''), 'default') = ? + AND COALESCE(worktree_path, '') = ? ORDER BY COALESCE(finished_at, updated_at, enqueued_at) DESC, id DESC` - baseArgs := []any{repoID, branch, agent, reviewType} + baseArgs := []any{repoID, branch, agent, reviewType, worktreePath} if limit <= 0 { jobs, _, err := db.scanReusableSessionCandidates(query, baseArgs, 0) return jobs, err @@ -180,9 +181,9 @@ func (db *DB) FindReusableSessionCandidates( // FindReusableSessionCandidate returns the newest reusable session candidate. func (db *DB) FindReusableSessionCandidate( - repoID int64, branch, agent, reviewType string, + repoID int64, branch, agent, reviewType, worktreePath string, ) (*ReviewJob, error) { - jobs, err := db.FindReusableSessionCandidates(repoID, branch, agent, reviewType, 1) + jobs, err := db.FindReusableSessionCandidates(repoID, branch, agent, reviewType, worktreePath, 1) if err != nil { return nil, err } From 71fd19490caa3cf985d5b2034102203232002142 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 23 Mar 2026 20:45:13 -0500 Subject: [PATCH 9/9] fix(daemon): resolve session reuse config from checkout root (#1) Pass checkoutRoot instead of repoRoot to findReusableSessionID so reuse_review_session and lookback settings are read from the worktree's .roborev.toml, consistent with branch exclusion. Co-Authored-By: Claude Opus 4.6 (1M context) --- internal/daemon/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 15a746ccd..a8ef74e7f 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -855,7 +855,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, GitRef: gitRef, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), Agent: agentName, Model: model, Reasoning: reasoning, @@ -930,7 +930,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { RepoID: repo.ID, GitRef: fullRef, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), Agent: agentName, Model: model, Reasoning: reasoning, @@ -981,7 +981,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { CommitID: commit.ID, GitRef: sha, Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), Agent: agentName, Model: model, Reasoning: reasoning,