diff --git a/internal/daemon/broadcaster.go b/internal/daemon/broadcaster.go index ff88162a9..39e86acd1 100644 --- a/internal/daemon/broadcaster.go +++ b/internal/daemon/broadcaster.go @@ -8,16 +8,17 @@ import ( // Event represents a review event that can be broadcast type Event struct { - Type string `json:"type"` - TS time.Time `json:"ts"` - JobID int64 `json:"job_id"` - Repo string `json:"repo"` - RepoName string `json:"repo_name"` - SHA string `json:"sha"` - Agent string `json:"agent,omitempty"` - Verdict string `json:"verdict,omitempty"` - Findings string `json:"findings,omitempty"` - Error string `json:"error,omitempty"` + Type string `json:"type"` + TS time.Time `json:"ts"` + JobID int64 `json:"job_id"` + Repo string `json:"repo"` + RepoName string `json:"repo_name"` + SHA string `json:"sha"` + Agent string `json:"agent,omitempty"` + Verdict string `json:"verdict,omitempty"` + Findings string `json:"findings,omitempty"` + Error string `json:"error,omitempty"` + WorktreePath string `json:"worktree_path,omitempty"` } // Subscriber represents a client subscribed to events @@ -112,26 +113,28 @@ func (b *EventBroadcaster) SubscriberCount() int { // MarshalJSON converts an Event to JSON for streaming func (e Event) MarshalJSON() ([]byte, error) { return json.Marshal(struct { - Type string `json:"type"` - TS string `json:"ts"` - JobID int64 `json:"job_id"` - Repo string `json:"repo"` - RepoName string `json:"repo_name"` - SHA string `json:"sha"` - Agent string `json:"agent,omitempty"` - Verdict string `json:"verdict,omitempty"` - Findings string `json:"findings,omitempty"` - Error string `json:"error,omitempty"` + Type string `json:"type"` + TS string `json:"ts"` + JobID int64 `json:"job_id"` + Repo string `json:"repo"` + RepoName string `json:"repo_name"` + SHA string `json:"sha"` + Agent string `json:"agent,omitempty"` + Verdict string `json:"verdict,omitempty"` + Findings string `json:"findings,omitempty"` + Error string `json:"error,omitempty"` + WorktreePath string `json:"worktree_path,omitempty"` }{ - Type: e.Type, - TS: e.TS.UTC().Format(time.RFC3339), - JobID: e.JobID, - Repo: e.Repo, - RepoName: e.RepoName, - SHA: e.SHA, - Agent: e.Agent, - Verdict: e.Verdict, - Findings: e.Findings, - Error: e.Error, + Type: e.Type, + TS: e.TS.UTC().Format(time.RFC3339), + JobID: e.JobID, + Repo: e.Repo, + RepoName: e.RepoName, + SHA: e.SHA, + Agent: e.Agent, + Verdict: e.Verdict, + Findings: e.Findings, + Error: e.Error, + WorktreePath: e.WorktreePath, }) } diff --git a/internal/daemon/event_test.go b/internal/daemon/event_test.go index 74e406cd2..e3b7593cb 100644 --- a/internal/daemon/event_test.go +++ b/internal/daemon/event_test.go @@ -53,4 +53,29 @@ func TestEvent_MarshalJSON(t *testing.T) { // Explicitly check that 'error' is not present _, hasError := decoded["error"] assert.False(t, hasError, "expected 'error' field to be omitted") + + // WorktreePath omitted when empty + _, hasWT := decoded["worktree_path"] + assert.False(t, hasWT, "expected 'worktree_path' to be omitted when empty") +} + +func TestEvent_MarshalJSON_WorktreePath(t *testing.T) { + event := Event{ + Type: "review.completed", + TS: time.Date(2026, 1, 11, 10, 0, 30, 0, time.UTC), + JobID: 42, + Repo: "/path/to/myrepo", + RepoName: "myrepo", + SHA: "abc123", + Agent: "claude-code", + WorktreePath: "/worktrees/feature-branch", + } + + data, err := event.MarshalJSON() + require.NoError(t, err) + + var decoded map[string]any + require.NoError(t, json.Unmarshal(data, &decoded)) + + assert.Equal(t, "/worktrees/feature-branch", decoded["worktree_path"]) } diff --git a/internal/daemon/hooks.go b/internal/daemon/hooks.go index 879645773..f2fe1ae81 100644 --- a/internal/daemon/hooks.go +++ b/internal/daemon/hooks.go @@ -136,11 +136,20 @@ func (hr *HookRunner) handleEvent(event Event) { return } - // Collect hooks: copy global slice to avoid aliasing, then append repo-specific - hooks := append([]config.HookConfig{}, cfg.Hooks...) + // Resolve one effective repo path: prefer the worktree if it still + // exists and belongs to the same repository. Used for both config + // loading (.roborev.toml) and as the hook working directory. + effectiveRepo := event.Repo + if event.WorktreePath != "" && event.Repo != "" { + if gitpkg.ValidateWorktreeForRepo(event.WorktreePath, event.Repo) { + effectiveRepo = event.WorktreePath + } + } - if event.Repo != "" { - if repoCfg, err := config.LoadRepoConfig(event.Repo); err == nil && repoCfg != nil { + // Collect hooks: copy global slice to avoid aliasing, then append repo-specific. + hooks := append([]config.HookConfig{}, cfg.Hooks...) + if effectiveRepo != "" { + if repoCfg, err := config.LoadRepoConfig(effectiveRepo); err == nil && repoCfg != nil { hooks = append(hooks, repoCfg.Hooks...) } } @@ -170,7 +179,7 @@ func (hr *HookRunner) handleEvent(event Event) { fired++ // Run async so hooks don't block workers hr.wg.Add(1) - go hr.runHook(cmd, event.Repo) + go hr.runHook(cmd, effectiveRepo) } if fired > 0 { diff --git a/internal/daemon/server.go b/internal/daemon/server.go index 4560fd552..a8ef74e7f 100644 --- a/internal/daemon/server.go +++ b/internal/daemon/server.go @@ -691,9 +691,9 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } req.ReviewType = canonical[0] - // Get the working directory root for git commands (may be a worktree) - // This is needed to resolve refs like HEAD correctly in the worktree context - gitCwd, err := git.GetRepoRoot(req.RepoPath) + // Get the checkout root (via --show-toplevel) for git commands. + // For worktrees this is the worktree root; for the main repo it equals repoRoot. + checkoutRoot, err := git.GetRepoRoot(req.RepoPath) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("not a git repository: %v", err)) return @@ -707,9 +707,17 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } + // Detect worktree: if the worktree toplevel differs from the main + // repo root, the request originated from a worktree checkout. + // Clean both paths to avoid false positives from normalization differences. + var worktreePath string + if filepath.Clean(checkoutRoot) != filepath.Clean(repoRoot) { + worktreePath = filepath.Clean(checkoutRoot) + } + // Check if branch is excluded from reviews - currentBranch := git.GetCurrentBranch(gitCwd) - if currentBranch != "" && config.IsBranchExcluded(repoRoot, currentBranch) { + currentBranch := git.GetCurrentBranch(checkoutRoot) + if currentBranch != "" && config.IsBranchExcluded(checkoutRoot, currentBranch) { // Silently skip excluded branches - return 200 OK with skipped flag writeJSON(w, map[string]any{ "skipped": true, @@ -834,6 +842,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { Label: gitRef, // Use git_ref as TUI label (run, analyze type, custom) JobType: req.JobType, Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue prompt job: %v", err)) @@ -841,18 +850,19 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } } else if isDirty { // Dirty review - use pre-captured diff - targetSHA, _ := git.ResolveSHA(gitCwd, "HEAD") + targetSHA, _ := git.ResolveSHA(checkoutRoot, "HEAD") job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: gitRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, targetSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - DiffContent: req.DiffContent, - Provider: req.Provider, + RepoID: repo.ID, + GitRef: gitRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, targetSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + DiffContent: req.DiffContent, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue dirty job: %v", err)) @@ -860,16 +870,16 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } } else if isRange { // For ranges, resolve both endpoints and create range job - // Use gitCwd to resolve refs correctly in worktree context + // Use checkoutRoot to resolve refs correctly in worktree context parts := strings.SplitN(gitRef, "..", 2) - startSHA, err := git.ResolveSHA(gitCwd, parts[0]) + startSHA, err := git.ResolveSHA(checkoutRoot, parts[0]) if err != nil { // If the start ref is ^ and resolution failed, the commit // may be the root commit (no parent). Use the empty tree SHA so // the range includes the root commit's changes. if before, ok := strings.CutSuffix(parts[0], "^"); ok { base := before - if _, resolveErr := git.ResolveSHA(gitCwd, base+"^{commit}"); resolveErr == nil { + if _, resolveErr := git.ResolveSHA(checkoutRoot, base+"^{commit}"); resolveErr == nil { startSHA = git.EmptyTreeSHA err = nil } @@ -879,7 +889,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } } - endSHA, err := git.ResolveSHA(gitCwd, parts[1]) + endSHA, err := git.ResolveSHA(checkoutRoot, parts[1]) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid end commit: %v", err)) return @@ -891,7 +901,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { // means we can't prove all are excluded. fullRef := startSHA + ".." + endSHA if rangeCommits, rcErr := git.GetRangeCommits( - gitCwd, fullRef, + checkoutRoot, fullRef, ); rcErr == nil && len(rangeCommits) > 0 { messages := make([]string, 0, len(rangeCommits)) allRead := true @@ -917,23 +927,24 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - GitRef: fullRef, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, endSHA), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - Provider: req.Provider, + RepoID: repo.ID, + GitRef: fullRef, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, endSHA), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) return } } else { - // Single commit - use gitCwd to resolve refs correctly in worktree context - sha, err := git.ResolveSHA(gitCwd, gitRef) + // Single commit - use checkoutRoot to resolve refs correctly in worktree context + sha, err := git.ResolveSHA(checkoutRoot, gitRef) if err != nil { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid commit: %v", err)) return @@ -963,20 +974,21 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { return } - patchID := git.GetPatchID(gitCwd, sha) + patchID := git.GetPatchID(checkoutRoot, sha) job, err = s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: repo.ID, - CommitID: commit.ID, - GitRef: sha, - Branch: req.Branch, - SessionID: s.findReusableSessionID(repoRoot, repo.ID, req.Branch, agentName, req.ReviewType, sha), - Agent: agentName, - Model: model, - Reasoning: reasoning, - ReviewType: req.ReviewType, - PatchID: patchID, - Provider: req.Provider, + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: sha, + Branch: req.Branch, + SessionID: s.findReusableSessionID(checkoutRoot, repo.ID, req.Branch, agentName, req.ReviewType, worktreePath, sha), + Agent: agentName, + Model: model, + Reasoning: reasoning, + ReviewType: req.ReviewType, + PatchID: patchID, + Provider: req.Provider, + WorktreePath: worktreePath, }) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("enqueue job: %v", err)) @@ -1008,7 +1020,7 @@ func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { } func (s *Server) findReusableSessionID( - repoPath string, repoID int64, branch, agentName, reviewType, targetSHA string, + repoPath string, repoID int64, branch, agentName, reviewType, worktreePath, targetSHA string, ) string { cfg := s.configWatcher.Config() if !config.ResolveReuseReviewSession(repoPath, cfg) || branch == "" || targetSHA == "" { @@ -1020,6 +1032,7 @@ func (s *Server) findReusableSessionID( branch, agentName, reviewType, + worktreePath, config.ResolveReuseReviewSessionLookback(repoPath, cfg), ) if err != nil { @@ -2332,18 +2345,19 @@ func (s *Server) handleFixJob(w http.ResponseWriter, r *http.Request) { // Enqueue the fix job job, err := s.db.EnqueueJob(storage.EnqueueOpts{ - RepoID: parentJob.RepoID, - CommitID: commitID, - GitRef: fixGitRef, - Branch: parentJob.Branch, - Agent: agentName, - Model: model, - Reasoning: reasoning, - Prompt: fixPrompt, - Agentic: true, - Label: fmt.Sprintf("fix #%d", req.ParentJobID), - JobType: storage.JobTypeFix, - ParentJobID: req.ParentJobID, + RepoID: parentJob.RepoID, + CommitID: commitID, + GitRef: fixGitRef, + Branch: parentJob.Branch, + Agent: agentName, + Model: model, + Reasoning: reasoning, + Prompt: fixPrompt, + Agentic: true, + Label: fmt.Sprintf("fix #%d", req.ParentJobID), + JobType: storage.JobTypeFix, + ParentJobID: req.ParentJobID, + WorktreePath: parentJob.WorktreePath, }) if err != nil { s.writeInternalError(w, fmt.Sprintf("enqueue fix job: %v", err)) diff --git a/internal/daemon/server_jobs_test.go b/internal/daemon/server_jobs_test.go index 8e9a1034e..034ad81c0 100644 --- a/internal/daemon/server_jobs_test.go +++ b/internal/daemon/server_jobs_test.go @@ -616,7 +616,7 @@ func TestHandleEnqueueReusesPreviousBranchSessionWhenEnabled(t *testing.T) { }, "failed to seed session_id: %v", err) } - candidate, err := db.FindReusableSessionCandidate(repo.ID, "feature/session", "test", config.ReviewTypeDefault) + candidate, err := db.FindReusableSessionCandidate(repo.ID, "feature/session", "test", config.ReviewTypeDefault, "") if err != nil { require.Condition(t, func() bool { return false @@ -625,7 +625,7 @@ func TestHandleEnqueueReusesPreviousBranchSessionWhenEnabled(t *testing.T) { require.NotNil(t, candidate, "expected reusable session candidate") assert.Equal(t, "session-123", candidate.SessionID, "candidate session_id") - reused := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, sha) + reused := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", sha) if reused != "session-123" { require.Condition(t, func() bool { return false @@ -780,7 +780,7 @@ func TestFindReusableSessionIDRejectsReusedBranchNameFromUnrelatedHistory(t *tes } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want empty for unrelated history", got) @@ -873,7 +873,7 @@ func TestFindReusableSessionIDRejectsCandidateThatIsTooOldOnBranch(t *testing.T) } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want empty for old candidate", got) @@ -1013,7 +1013,7 @@ func TestFindReusableSessionIDFallsBackToOlderValidCandidate(t *testing.T) { } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() = %q, want %q", got, "session-valid") @@ -1161,21 +1161,21 @@ func TestFindReusableSessionIDUsesConfigurableLookback(t *testing.T) { } targetSHA := testutil.GetHeadSHA(t, repoDir) - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with default lookback = %q, want %q", got, "session-valid") } server.configWatcher.Config().ReuseReviewSessionLookback = 10 - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with capped lookback = %q, want empty", got) } server.configWatcher.Config().ReuseReviewSessionLookback = 12 - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with expanded lookback = %q, want %q", got, "session-valid") @@ -1305,7 +1305,7 @@ func TestFindReusableSessionIDLookbackIgnoresUnusableRefs(t *testing.T) { }, "failed to seed malformed session candidate: %v", err) } - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with unusable newer refs = %q, want %q", got, "session-valid") @@ -1407,7 +1407,7 @@ func TestFindReusableSessionIDSkipsInvalidStoredSessionID(t *testing.T) { }, "failed to seed invalid session_id: %v", err) } - if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, targetSHA); got != "session-valid" { + if got := server.findReusableSessionID(repoRoot, repo.ID, "feature/session", "test", config.ReviewTypeDefault, "", targetSHA); got != "session-valid" { require.Condition(t, func() bool { return false }, "findReusableSessionID() with invalid stored session_id = %q, want %q", got, "session-valid") diff --git a/internal/daemon/server_ops_test.go b/internal/daemon/server_ops_test.go index 64845d5e1..1b0bcc8f6 100644 --- a/internal/daemon/server_ops_test.go +++ b/internal/daemon/server_ops_test.go @@ -597,6 +597,50 @@ func TestHandleFixJobStaleValidation(t *testing.T) { require.Equal(t, commit.Subject, stored.CommitSubject) }) + t.Run("fix job inherits parent worktree path", func(t *testing.T) { + // Enqueue a review with a worktree path + wtJob, err := db.EnqueueJob(storage.EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "fix-val-abc", + Agent: "test", + WorktreePath: filepath.Join(tmpDir, "worktrees", "feature"), + }) + require.NoError(t, err) + + // Claim jobs until we get ours (earlier subtests may leave queued jobs) + for { + claimed, claimErr := db.ClaimJob("w-wt-fix") + require.NoError(t, claimErr) + require.NotNil(t, claimed) + if claimed.ID == wtJob.ID { + break + } + db.CompleteJob(claimed.ID, "test", "prompt", "PASS") + } + require.NoError(t, db.CompleteJob( + wtJob.ID, "test", "prompt", "FAIL: issues found", + )) + + req := testutil.MakeJSONRequest( + t, http.MethodPost, "/api/job/fix", + fixJobRequest{ParentJobID: wtJob.ID}, + ) + w := httptest.NewRecorder() + server.handleFixJob(w, req) + assertHandlerStatus(t, w, http.StatusCreated) + + var fixJob storage.ReviewJob + testutil.DecodeJSON(t, w, &fixJob) + + stored, err := db.GetJobByID(fixJob.ID) + require.NoError(t, err) + require.Equal(t, + filepath.Join(tmpDir, "worktrees", "feature"), + stored.WorktreePath, + ) + }) + t.Run("custom prompt includes review context", func(t *testing.T) { req := testutil.MakeJSONRequest( t, http.MethodPost, "/api/job/fix", diff --git a/internal/daemon/worker.go b/internal/daemon/worker.go index cd1c99d6b..d01695d54 100644 --- a/internal/daemon/worker.go +++ b/internal/daemon/worker.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log" + "os" "strings" "sync" "sync/atomic" @@ -361,6 +362,18 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { return } + // Resolve effective repo path: use worktree if available, still exists, + // and is a valid git checkout for the same repository. + effectiveRepoPath := job.RepoPath + if job.WorktreePath != "" { + if !gitpkg.ValidateWorktreeForRepo(job.WorktreePath, job.RepoPath) { + log.Printf("[%s] Worktree %s invalid or gone for job %d, using main repo", + workerID, job.WorktreePath, job.ID) + } else { + effectiveRepoPath = job.WorktreePath + } + } + // Build the prompt (or use pre-stored prompt for task/compact jobs). // Create a per-job builder with the snapshotted config so exclude // patterns are resolved consistently. @@ -382,10 +395,10 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { err = fmt.Errorf("%s job %d has no stored prompt (git_ref=%q); restart the daemon with 'roborev daemon restart'", job.JobType, job.ID, job.GitRef) } else if job.DiffContent != nil { // Dirty job - use pre-captured diff - reviewPrompt, err = pb.BuildDirty(job.RepoPath, *job.DiffContent, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) + reviewPrompt, err = pb.BuildDirty(effectiveRepoPath, *job.DiffContent, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) } else { // Normal job - build prompt from git ref - reviewPrompt, err = pb.Build(job.RepoPath, job.GitRef, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) + reviewPrompt, err = pb.Build(effectiveRepoPath, job.GitRef, job.RepoID, cfg.ReviewContextCount, job.Agent, job.ReviewType) } if err != nil { log.Printf("[%s] Error building prompt: %v", workerID, err) @@ -435,15 +448,22 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { log.Printf("[%s] Agent %s not available, using %s", workerID, job.Agent, agentName) } + // Use the effective worktree path for events (empty when worktree is gone or not a worktree job). + eventWorktreePath := "" + if effectiveRepoPath != job.RepoPath { + eventWorktreePath = effectiveRepoPath + } + // Broadcast started event wp.broadcaster.Broadcast(Event{ - Type: "review.started", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, + Type: "review.started", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + WorktreePath: eventWorktreePath, }) // Create output writer for tail command @@ -473,7 +493,7 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // For fix jobs, create an isolated worktree to run the agent in. // The agent modifies files in the worktree; afterwards we capture the diff as a patch. - reviewRepoPath := job.RepoPath + reviewRepoPath := effectiveRepoPath var fixWorktree *worktree.Worktree if job.IsFixJob() { wt, wtErr := worktree.Create(job.RepoPath, job.GitRef) @@ -504,13 +524,14 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { log.Printf("[%s] Job %d was canceled", workerID, job.ID) // Broadcast cancellation event wp.broadcaster.Broadcast(Event{ - Type: "review.canceled", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, + Type: "review.canceled", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + WorktreePath: eventWorktreePath, }) return // Job already marked as canceled in DB, nothing more to do } @@ -633,15 +654,16 @@ func (wp *WorkerPool) processJob(workerID string, job *storage.ReviewJob) { // Broadcast completion event verdict := storage.ParseVerdict(output) wp.broadcaster.Broadcast(Event{ - Type: "review.completed", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, - Verdict: verdict, - Findings: output, + Type: "review.completed", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + Verdict: verdict, + Findings: output, + WorktreePath: eventWorktreePath, }) } @@ -770,15 +792,22 @@ func (wp *WorkerPool) resolveBackupModel(job *storage.ReviewJob) string { // broadcastFailed sends a review.failed event for a job func (wp *WorkerPool) broadcastFailed(job *storage.ReviewJob, agentName, errorMsg string) { + wtPath := "" + if job.WorktreePath != "" { + if _, err := os.Stat(job.WorktreePath); err == nil { + wtPath = job.WorktreePath + } + } wp.broadcaster.Broadcast(Event{ - Type: "review.failed", - TS: time.Now(), - JobID: job.ID, - Repo: job.RepoPath, - RepoName: job.RepoName, - SHA: job.GitRef, - Agent: agentName, - Error: errorMsg, + Type: "review.failed", + TS: time.Now(), + JobID: job.ID, + Repo: job.RepoPath, + RepoName: job.RepoName, + SHA: job.GitRef, + Agent: agentName, + Error: errorMsg, + WorktreePath: wtPath, }) } diff --git a/internal/git/git.go b/internal/git/git.go index 8c2226c26..143eeec50 100644 --- a/internal/git/git.go +++ b/internal/git/git.go @@ -237,6 +237,37 @@ func GetRepoRoot(path string) (string, error) { return normalizeMSYSPath(string(out)), nil } +// ValidateWorktreeForRepo checks that worktreePath is a git checkout +// whose main repository root matches repoRoot. Returns true if the +// worktree is valid for the given repo. Returns false (without error) +// if the path doesn't exist, isn't a git repo, or belongs to a +// different repository. +func ValidateWorktreeForRepo(worktreePath, repoRoot string) bool { + // Verify the path is itself a checkout root (not a subdirectory). + toplevel, err := GetRepoRoot(worktreePath) + if err != nil { + return false + } + if cleanEvalPath(toplevel) != cleanEvalPath(worktreePath) { + return false + } + // Verify the checkout belongs to the same main repository. + mainRoot, err := GetMainRepoRoot(worktreePath) + if err != nil { + return false + } + return cleanEvalPath(mainRoot) == cleanEvalPath(repoRoot) +} + +// cleanEvalPath resolves symlinks and cleans the path for comparison. +// Falls back to filepath.Clean if symlink resolution fails. +func cleanEvalPath(p string) string { + if resolved, err := filepath.EvalSymlinks(p); err == nil { + return resolved + } + return filepath.Clean(p) +} + // GetMainRepoRoot returns the main repository root, resolving through worktrees. // For a regular repository or submodule, this returns the same as GetRepoRoot. // For a worktree, this returns the main repository's root path. diff --git a/internal/git/git_test.go b/internal/git/git_test.go index 553e227d8..0553fd135 100644 --- a/internal/git/git_test.go +++ b/internal/git/git_test.go @@ -1535,3 +1535,43 @@ func TestWorktreePathForBranch(t *testing.T) { } }) } + +func TestValidateWorktreeForRepo(t *testing.T) { + if _, err := exec.LookPath("git"); err != nil { + t.Skip("git not found") + } + + t.Run("valid worktree passes", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + wt := repo.AddWorktree("val-wt") + + assert.True(t, ValidateWorktreeForRepo(wt.Dir, repo.Dir)) + }) + + t.Run("main repo validates against itself", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + + assert.True(t, ValidateWorktreeForRepo(repo.Dir, repo.Dir)) + }) + + t.Run("nonexistent path fails", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + + assert.False(t, ValidateWorktreeForRepo("/nonexistent/path", repo.Dir)) + }) + + t.Run("unrelated repo fails", func(t *testing.T) { + repo1 := NewTestRepoWithCommit(t) + repo2 := NewTestRepoWithCommit(t) + + assert.False(t, ValidateWorktreeForRepo(repo2.Dir, repo1.Dir)) + }) + + t.Run("subdirectory of same repo fails", func(t *testing.T) { + repo := NewTestRepoWithCommit(t) + subDir := filepath.Join(repo.Dir, "src") + require.NoError(t, os.MkdirAll(subDir, 0o755)) + + assert.False(t, ValidateWorktreeForRepo(subDir, repo.Dir)) + }) +} diff --git a/internal/storage/db.go b/internal/storage/db.go index 222fa7391..f0219eed7 100644 --- a/internal/storage/db.go +++ b/internal/storage/db.go @@ -707,6 +707,18 @@ func (db *DB) migrate() error { } } + // Migration: add worktree_path column to review_jobs if missing + err = db.QueryRow(`SELECT COUNT(*) FROM pragma_table_info('review_jobs') WHERE name = 'worktree_path'`).Scan(&count) + if err != nil { + return fmt.Errorf("check worktree_path column: %w", err) + } + if count == 0 { + _, err = db.Exec(`ALTER TABLE review_jobs ADD COLUMN worktree_path TEXT DEFAULT ''`) + if err != nil { + return fmt.Errorf("add worktree_path column: %w", err) + } + } + // Run sync-related migrations if err := db.migrateSyncColumns(); err != nil { return err diff --git a/internal/storage/db_job_test.go b/internal/storage/db_job_test.go index 00dedd0e5..55697b014 100644 --- a/internal/storage/db_job_test.go +++ b/internal/storage/db_job_test.go @@ -871,6 +871,39 @@ func TestReenqueueJob(t *testing.T) { require.Error(t, err) }) + t.Run("rerun preserves worktree_path", func(t *testing.T) { + isolatedDB := openTestDB(t) + defer isolatedDB.Close() + + repo := createRepo(t, isolatedDB, "/tmp/wt-preserve-repo") + commit := createCommit(t, isolatedDB, repo.ID, "wt-preserve-sha") + + job, err := isolatedDB.EnqueueJob(EnqueueOpts{ + RepoID: repo.ID, + CommitID: commit.ID, + GitRef: "wt-preserve-sha", + Agent: "test", + WorktreePath: "/tmp/wt/feature-branch", + }) + require.NoError(t, err) + + claimed, err := isolatedDB.ClaimJob("worker-1") + require.NoError(t, err) + require.NotNil(t, claimed) + assert.Equal(t, job.ID, claimed.ID) + + err = isolatedDB.CompleteJob(job.ID, "test", "prompt", "output") + require.NoError(t, err) + + err = isolatedDB.ReenqueueJob(job.ID) + require.NoError(t, err) + + updated, err := isolatedDB.GetJobByID(job.ID) + require.NoError(t, err) + assert.Equal(t, JobStatusQueued, updated.Status) + assert.Equal(t, "/tmp/wt/feature-branch", updated.WorktreePath) + }) + t.Run("rerun done job and complete again", func(t *testing.T) { // Use isolated database to avoid interference from other subtests isolatedDB := openTestDB(t) diff --git a/internal/storage/hydration.go b/internal/storage/hydration.go index 16591fe0c..804324f35 100644 --- a/internal/storage/hydration.go +++ b/internal/storage/hydration.go @@ -31,6 +31,7 @@ type reviewJobScanFields struct { TokenUsage sql.NullString Agentic int Closed sql.NullInt64 + WorktreePath string } func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { @@ -107,6 +108,7 @@ func applyReviewJobScan(job *ReviewJob, fields reviewJobScanFields) { closed := fields.Closed.Int64 != 0 job.Closed = &closed } + job.WorktreePath = fields.WorktreePath } type reviewScanFields struct { diff --git a/internal/storage/jobs.go b/internal/storage/jobs.go index f28703c70..cf4f69cd1 100644 --- a/internal/storage/jobs.go +++ b/internal/storage/jobs.go @@ -58,6 +58,7 @@ type EnqueueOpts struct { Label string // Display label in TUI for task jobs (default: "prompt") JobType string // Explicit job type (review/range/dirty/task/compact/fix); inferred if empty ParentJobID int64 // Parent job being fixed (for fix jobs) + WorktreePath string // Worktree checkout path (empty = use main repo root) } // EnqueueJob creates a new review job. The job type is inferred from opts. @@ -119,14 +120,14 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { result, err := db.Exec(` INSERT INTO review_jobs (repo_id, commit_id, git_ref, branch, session_id, agent, model, provider, reasoning, status, job_type, review_type, patch_id, diff_content, prompt, agentic, output_prefix, - parent_job_id, uuid, source_machine_id, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + parent_job_id, uuid, source_machine_id, updated_at, worktree_path) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, opts.RepoID, commitIDParam, gitRef, nullString(opts.Branch), nullString(opts.SessionID), opts.Agent, nullString(opts.Model), nullString(opts.Provider), reasoning, jobType, opts.ReviewType, nullString(opts.PatchID), nullString(opts.DiffContent), nullString(opts.Prompt), agenticInt, nullString(opts.OutputPrefix), parentJobIDParam, - uid, machineID, nowStr) + uid, machineID, nowStr, opts.WorktreePath) if err != nil { return nil, err } @@ -153,6 +154,7 @@ func (db *DB) EnqueueJob(opts EnqueueOpts) (*ReviewJob, error) { UUID: uid, SourceMachineID: machineID, UpdatedAt: &now, + WorktreePath: opts.WorktreePath, } if opts.ParentJobID > 0 { job.ParentJobID = &opts.ParentJobID @@ -202,7 +204,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { err = db.QueryRow(` SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.model, j.provider, j.reasoning, j.status, j.enqueued_at, r.root_path, r.name, c.subject, j.diff_content, j.prompt, COALESCE(j.agentic, 0), j.job_type, j.review_type, - j.output_prefix, j.patch_id, j.parent_job_id + j.output_prefix, j.patch_id, j.parent_job_id, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -211,7 +213,7 @@ func (db *DB) ClaimJob(workerID string) (*ReviewJob, error) { LIMIT 1 `, workerID).Scan(&job.ID, &job.RepoID, &fields.CommitID, &job.GitRef, &fields.Branch, &fields.SessionID, &job.Agent, &fields.Model, &fields.Provider, &job.Reasoning, &job.Status, &fields.EnqueuedAt, &job.RepoPath, &job.RepoName, &fields.CommitSubject, &fields.DiffContent, &fields.Prompt, &fields.Agentic, &fields.JobType, &fields.ReviewType, - &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID) + &fields.OutputPrefix, &fields.PatchID, &fields.ParentJobID, &fields.WorktreePath) if err != nil { return nil, err } @@ -769,7 +771,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, j.retry_count, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, rv.closed, rv.output, rv.verdict_bool, j.source_machine_id, j.uuid, j.model, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.provider, j.token_usage + j.parent_job_id, j.provider, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -807,7 +809,7 @@ func (db *DB) ListJobs(statusFilter string, repoFilter string, limit, offset int &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &j.RetryCount, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Closed, &output, &verdictBool, &fields.SourceMachineID, &fields.UUID, &fields.Model, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Provider, &fields.TokenUsage) + &fields.ParentJobID, &fields.Provider, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } @@ -857,7 +859,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { SELECT j.id, j.repo_id, j.commit_id, j.git_ref, j.branch, j.session_id, j.agent, j.reasoning, j.status, j.enqueued_at, j.started_at, j.finished_at, j.worker_id, j.error, j.prompt, COALESCE(j.agentic, 0), r.root_path, r.name, c.subject, j.model, j.provider, j.job_type, j.review_type, j.patch_id, - j.parent_job_id, j.patch, j.token_usage + j.parent_job_id, j.patch, j.token_usage, COALESCE(j.worktree_path, '') FROM review_jobs j JOIN repos r ON r.id = j.repo_id LEFT JOIN commits c ON c.id = j.commit_id @@ -865,7 +867,7 @@ func (db *DB) GetJobByID(id int64) (*ReviewJob, error) { `, id).Scan(&j.ID, &j.RepoID, &fields.CommitID, &j.GitRef, &fields.Branch, &fields.SessionID, &j.Agent, &j.Reasoning, &j.Status, &fields.EnqueuedAt, &fields.StartedAt, &fields.FinishedAt, &fields.WorkerID, &fields.Error, &fields.Prompt, &fields.Agentic, &j.RepoPath, &j.RepoName, &fields.CommitSubject, &fields.Model, &fields.Provider, &fields.JobType, &fields.ReviewType, &fields.PatchID, - &fields.ParentJobID, &fields.Patch, &fields.TokenUsage) + &fields.ParentJobID, &fields.Patch, &fields.TokenUsage, &fields.WorktreePath) if err != nil { return nil, err } diff --git a/internal/storage/models.go b/internal/storage/models.go index 1334df6d1..07fc9fadf 100644 --- a/internal/storage/models.go +++ b/internal/storage/models.go @@ -72,6 +72,7 @@ type ReviewJob struct { OutputPrefix string `json:"output_prefix,omitempty"` // Prefix to prepend to review output ParentJobID *int64 `json:"parent_job_id,omitempty"` // Job being fixed (for fix jobs) Patch *string `json:"patch,omitempty"` // Generated diff patch (for completed fix jobs) + WorktreePath string `json:"worktree_path,omitempty"` // Worktree checkout path (empty = use RepoPath) TokenUsage string `json:"token_usage,omitempty"` // JSON blob from agentsview (token consumption) // Sync fields UUID string `json:"uuid,omitempty"` // Globally unique identifier for sync diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 61298edfd..ac9193360 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -14,12 +14,12 @@ import ( ) // PostgreSQL schema version - increment when schema changes -const pgSchemaVersion = 8 +const pgSchemaVersion = 9 // pgSchemaName is the PostgreSQL schema used to isolate roborev tables const pgSchemaName = "roborev" -//go:embed schemas/postgres_v8.sql +//go:embed schemas/postgres_v9.sql var pgSchemaSQL string // pgSchemaStatements returns the individual DDL statements for schema creation. @@ -282,6 +282,12 @@ func (p *PgPool) EnsureSchema(ctx context.Context) error { return fmt.Errorf("migrate to v8 (add token_usage column): %w", err) } } + if currentVersion < 9 { + _, err = p.pool.Exec(ctx, `ALTER TABLE review_jobs ADD COLUMN IF NOT EXISTS worktree_path TEXT`) + if err != nil { + return fmt.Errorf("migrate to v9 (add worktree_path column): %w", err) + } + } // Update version _, err = p.pool.Exec(ctx, `INSERT INTO schema_version (version) VALUES ($1) ON CONFLICT (version) DO NOTHING`, pgSchemaVersion) if err != nil { @@ -535,8 +541,8 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, NOW()) + worktree_path, source_machine_id, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, @@ -547,10 +553,11 @@ func (p *PgPool) UpsertJob(ctx context.Context, j SyncableJob, pgRepoID int64, p commit_id = EXCLUDED.commit_id, patch_id = EXCLUDED.patch_id, token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() `, j.UUID, pgRepoID, pgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Model), nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, - nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), j.SourceMachineID) + nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) return err } @@ -606,6 +613,7 @@ type PulledJob struct { DiffContent *string Error string TokenUsage string + WorktreePath string SourceMachineID string UpdatedAt time.Time } @@ -631,7 +639,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, j.started_at, j.finished_at, COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), - j.source_machine_id, j.updated_at, j.id + COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at, j.id FROM review_jobs j JOIN repos r ON j.repo_id = r.id LEFT JOIN commits c ON j.commit_id = c.id @@ -658,7 +666,7 @@ func (p *PgPool) PullJobs(ctx context.Context, excludeMachineID string, cursor s &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &j.EnqueuedAt, &j.StartedAt, &j.FinishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, - &j.SourceMachineID, &j.UpdatedAt, &lastID, + &j.WorktreePath, &j.SourceMachineID, &j.UpdatedAt, &lastID, ) if err != nil { return nil, cursor, fmt.Errorf("scan job: %w", err) @@ -926,8 +934,8 @@ func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bo INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, NOW()) + worktree_path, source_machine_id, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, NOW()) ON CONFLICT (uuid) DO UPDATE SET status = EXCLUDED.status, finished_at = EXCLUDED.finished_at, @@ -937,10 +945,11 @@ func (p *PgPool) BatchUpsertJobs(ctx context.Context, jobs []JobWithPgIDs) ([]bo commit_id = EXCLUDED.commit_id, patch_id = EXCLUDED.patch_id, token_usage = COALESCE(EXCLUDED.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(EXCLUDED.worktree_path, review_jobs.worktree_path), updated_at = NOW() `, j.UUID, jw.PgRepoID, jw.PgCommitID, j.GitRef, nullString(j.SessionID), j.Agent, nullString(j.Reasoning), defaultStr(j.JobType, "review"), j.ReviewType, nullString(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt, j.StartedAt, j.FinishedAt, - nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), j.SourceMachineID) + nullString(j.Prompt), j.DiffContent, nullString(j.Error), nullString(j.TokenUsage), nullString(j.WorktreePath), j.SourceMachineID) } br := p.pool.SendBatch(ctx, batch) diff --git a/internal/storage/postgres_test.go b/internal/storage/postgres_test.go index 5bcb454cf..577005f1a 100644 --- a/internal/storage/postgres_test.go +++ b/internal/storage/postgres_test.go @@ -725,6 +725,60 @@ func TestIntegration_BatchUpsertJobs(t *testing.T) { require.NoError(t, err) assert.Nil(t, success) }) + + t.Run("worktree_path round-trips through batch upsert and pull", func(t *testing.T) { + wtJobUUID := uuid.NewString() + // Use a distinct machine ID so we can exclude it when pulling + wtMachineID := uuid.NewString() + commitID := createTestCommit(t, pool.Pool(), TestCommitOpts{ + RepoID: repoID, SHA: "batch-wt-sha", + }) + wtJobs := []JobWithPgIDs{{ + Job: SyncableJob{ + UUID: wtJobUUID, + RepoIdentity: "https://github.com/test/batch-jobs-test.git", + CommitSHA: "batch-wt-sha", + GitRef: "test-ref", + Agent: "test", + Status: "done", + WorktreePath: "/worktrees/feature-x", + SourceMachineID: wtMachineID, + EnqueuedAt: time.Now(), + }, + PgRepoID: repoID, + PgCommitID: &commitID, + }} + + success, err := pool.BatchUpsertJobs(ctx, wtJobs) + require.NoError(t, err) + assert.Equal(t, 1, countSuccesses(success)) + + // Verify worktree_path directly in the database row + var storedWT *string + err = pool.pool.QueryRow(ctx, + `SELECT worktree_path FROM review_jobs WHERE uuid = $1`, wtJobUUID, + ).Scan(&storedWT) + require.NoError(t, err) + require.NotNil(t, storedWT) + assert.Equal(t, "/worktrees/feature-x", *storedWT) + + // Also verify PullJobs returns the field: use a different + // valid UUID to exclude, and scope via cursor to avoid + // scanning the entire table. + otherMachine := uuid.NewString() + pulled, _, err := pool.PullJobs(ctx, otherMachine, "", 100) + require.NoError(t, err) + + var found *PulledJob + for i := range pulled { + if pulled[i].UUID == wtJobUUID { + found = &pulled[i] + break + } + } + require.NotNil(t, found, "expected job %s in pull results", wtJobUUID) + assert.Equal(t, "/worktrees/feature-x", found.WorktreePath) + }) } func TestIntegration_BatchUpsertReviews(t *testing.T) { diff --git a/internal/storage/reviews.go b/internal/storage/reviews.go index c6359e5dd..850ff6630 100644 --- a/internal/storage/reviews.go +++ b/internal/storage/reviews.go @@ -135,7 +135,7 @@ func (db *DB) GetRecentReviewsForRepo(repoID int64, limit int) ([]Review, error) // FindReusableSessionCandidates returns recent completed jobs with reusable // sessions for the same repo, branch, agent, and review type, newest first. func (db *DB) FindReusableSessionCandidates( - repoID int64, branch, agent, reviewType string, limit int, + repoID int64, branch, agent, reviewType, worktreePath string, limit int, ) ([]ReviewJob, error) { if repoID == 0 || branch == "" || agent == "" { return nil, nil @@ -153,8 +153,9 @@ func (db *DB) FindReusableSessionCandidates( AND session_id IS NOT NULL AND session_id <> '' AND COALESCE(NULLIF(review_type, ''), 'default') = ? + AND COALESCE(worktree_path, '') = ? ORDER BY COALESCE(finished_at, updated_at, enqueued_at) DESC, id DESC` - baseArgs := []any{repoID, branch, agent, reviewType} + baseArgs := []any{repoID, branch, agent, reviewType, worktreePath} if limit <= 0 { jobs, _, err := db.scanReusableSessionCandidates(query, baseArgs, 0) return jobs, err @@ -180,9 +181,9 @@ func (db *DB) FindReusableSessionCandidates( // FindReusableSessionCandidate returns the newest reusable session candidate. func (db *DB) FindReusableSessionCandidate( - repoID int64, branch, agent, reviewType string, + repoID int64, branch, agent, reviewType, worktreePath string, ) (*ReviewJob, error) { - jobs, err := db.FindReusableSessionCandidates(repoID, branch, agent, reviewType, 1) + jobs, err := db.FindReusableSessionCandidates(repoID, branch, agent, reviewType, worktreePath, 1) if err != nil { return nil, err } diff --git a/internal/storage/schemas/postgres_v9.sql b/internal/storage/schemas/postgres_v9.sql new file mode 100644 index 000000000..9fe819a77 --- /dev/null +++ b/internal/storage/schemas/postgres_v9.sql @@ -0,0 +1,102 @@ +-- PostgreSQL schema version 9 +-- Adds worktree_path to review_jobs for worktree-aware job execution. +-- Note: Version is managed by EnsureSchema(), not this file. + +CREATE SCHEMA IF NOT EXISTS roborev; + +CREATE TABLE IF NOT EXISTS roborev.schema_version ( + version INTEGER PRIMARY KEY, + applied_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.machines ( + id SERIAL PRIMARY KEY, + machine_id UUID UNIQUE NOT NULL, + name TEXT, + last_seen_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.repos ( + id SERIAL PRIMARY KEY, + identity TEXT UNIQUE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.commits ( + id SERIAL PRIMARY KEY, + repo_id INTEGER REFERENCES roborev.repos(id), + sha TEXT NOT NULL, + author TEXT NOT NULL, + subject TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + UNIQUE(repo_id, sha) +); + +CREATE TABLE IF NOT EXISTS roborev.review_jobs ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + repo_id INTEGER NOT NULL REFERENCES roborev.repos(id), + commit_id INTEGER REFERENCES roborev.commits(id), + git_ref TEXT NOT NULL, + branch TEXT, + session_id TEXT, + agent TEXT NOT NULL, + model TEXT, + reasoning TEXT, + job_type TEXT NOT NULL DEFAULT 'review', + review_type TEXT NOT NULL DEFAULT '', + patch_id TEXT, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'done', 'failed', 'canceled', 'applied', 'rebased')), + agentic BOOLEAN DEFAULT FALSE, + enqueued_at TIMESTAMP WITH TIME ZONE NOT NULL, + started_at TIMESTAMP WITH TIME ZONE, + finished_at TIMESTAMP WITH TIME ZONE, + prompt TEXT, + diff_content TEXT, + error TEXT, + token_usage TEXT, + worktree_path TEXT, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.reviews ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + agent TEXT NOT NULL, + prompt TEXT NOT NULL, + output TEXT NOT NULL, + closed BOOLEAN NOT NULL DEFAULT FALSE, + updated_by_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS roborev.responses ( + id SERIAL PRIMARY KEY, + uuid UUID UNIQUE NOT NULL, + job_uuid UUID NOT NULL REFERENCES roborev.review_jobs(uuid), + responder TEXT NOT NULL, + response TEXT NOT NULL, + source_machine_id UUID NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_review_jobs_source ON roborev.review_jobs(source_machine_id); +CREATE INDEX IF NOT EXISTS idx_review_jobs_updated ON roborev.review_jobs(updated_at); +-- Note: idx_review_jobs_branch, idx_review_jobs_job_type, and +-- idx_review_jobs_patch_id are created by migration code, not here +-- (to support upgrades from older versions where those columns +-- don't exist yet). +CREATE INDEX IF NOT EXISTS idx_reviews_job_uuid ON roborev.reviews(job_uuid); +CREATE INDEX IF NOT EXISTS idx_reviews_updated ON roborev.reviews(updated_at); +CREATE INDEX IF NOT EXISTS idx_responses_job_uuid ON roborev.responses(job_uuid); +CREATE INDEX IF NOT EXISTS idx_responses_id ON roborev.responses(id); + +CREATE TABLE IF NOT EXISTS roborev.sync_metadata ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); diff --git a/internal/storage/sync.go b/internal/storage/sync.go index 8c0a5816d..6a54d9518 100644 --- a/internal/storage/sync.go +++ b/internal/storage/sync.go @@ -286,6 +286,7 @@ type SyncableJob struct { DiffContent *string Error string TokenUsage string + WorktreePath string SourceMachineID string UpdatedAt time.Time } @@ -300,7 +301,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) j.git_ref, COALESCE(j.session_id, ''), j.agent, COALESCE(j.model, ''), COALESCE(j.reasoning, ''), COALESCE(j.job_type, 'review'), COALESCE(j.review_type, ''), COALESCE(j.patch_id, ''), j.status, j.agentic, j.enqueued_at, COALESCE(j.started_at, ''), COALESCE(j.finished_at, ''), COALESCE(j.prompt, ''), j.diff_content, COALESCE(j.error, ''), COALESCE(j.token_usage, ''), - j.source_machine_id, j.updated_at + COALESCE(j.worktree_path, ''), j.source_machine_id, j.updated_at FROM review_jobs j JOIN repos r ON j.repo_id = r.id LEFT JOIN commits c ON j.commit_id = c.id @@ -335,7 +336,7 @@ func (db *DB) GetJobsToSync(machineID string, limit int) ([]SyncableJob, error) &j.GitRef, &j.SessionID, &j.Agent, &j.Model, &j.Reasoning, &j.JobType, &j.ReviewType, &j.PatchID, &j.Status, &j.Agentic, &enqueuedAt, &startedAt, &finishedAt, &j.Prompt, &diffContent, &j.Error, &j.TokenUsage, - &j.SourceMachineID, &updatedAt, + &j.WorktreePath, &j.SourceMachineID, &updatedAt, ) if err != nil { return nil, fmt.Errorf("scan job: %w", err) @@ -578,8 +579,8 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error INSERT INTO review_jobs ( uuid, repo_id, commit_id, git_ref, session_id, agent, model, reasoning, job_type, review_type, patch_id, status, agentic, enqueued_at, started_at, finished_at, prompt, diff_content, error, token_usage, - source_machine_id, updated_at, synced_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + worktree_path, source_machine_id, updated_at, synced_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(uuid) DO UPDATE SET status = excluded.status, finished_at = excluded.finished_at, @@ -590,13 +591,14 @@ func (db *DB) UpsertPulledJob(j PulledJob, repoID int64, commitID *int64) error commit_id = excluded.commit_id, patch_id = excluded.patch_id, token_usage = COALESCE(excluded.token_usage, review_jobs.token_usage), + worktree_path = COALESCE(excluded.worktree_path, review_jobs.worktree_path), updated_at = excluded.updated_at, synced_at = ? `, j.UUID, repoID, commitID, j.GitRef, nullStr(j.SessionID), j.Agent, nullStr(j.Model), j.Reasoning, j.JobType, j.ReviewType, nullStr(j.PatchID), j.Status, j.Agentic, j.EnqueuedAt.Format(time.RFC3339), nullTimeStr(j.StartedAt), nullTimeStr(j.FinishedAt), nullStr(j.Prompt), j.DiffContent, nullStr(j.Error), nullStr(j.TokenUsage), - j.SourceMachineID, j.UpdatedAt.Format(time.RFC3339), now, now) + nullStr(j.WorktreePath), j.SourceMachineID, j.UpdatedAt.Format(time.RFC3339), now, now) return err } diff --git a/internal/storage/sync_backfill_test.go b/internal/storage/sync_backfill_test.go index 934a90014..b9c167a61 100644 --- a/internal/storage/sync_backfill_test.go +++ b/internal/storage/sync_backfill_test.go @@ -221,3 +221,83 @@ func TestUpsertPulledJob_BackfillsModel(t *testing.T) { assert.False(t, !modelPreserved.Valid || modelPreserved.String != "gpt-4") } + +func TestGetJobsToSync_IncludesWorktreePath(t *testing.T) { + h := newSyncTestHelper(t) + + // Enqueue a job with a worktree path + commit, err := h.db.GetOrCreateCommit(h.repo.ID, "wt-sync-abc", "Author", "Subject", time.Now()) + require.NoError(t, err) + + job, err := h.db.EnqueueJob(EnqueueOpts{ + RepoID: h.repo.ID, + CommitID: commit.ID, + GitRef: "wt-sync-abc", + Agent: "test", + WorktreePath: "/worktrees/feature-branch", + }) + require.NoError(t, err) + + // Complete the job so it becomes syncable + claimed, err := h.db.ClaimJob("w-sync-wt") + require.NoError(t, err) + require.Equal(t, job.ID, claimed.ID) + require.NoError(t, h.db.CompleteJob(job.ID, "test", "prompt", "PASS")) + + jobs, err := h.db.GetJobsToSync(h.machineID, 10) + require.NoError(t, err) + require.NotEmpty(t, jobs) + + var found *SyncableJob + for i := range jobs { + if jobs[i].UUID == job.UUID { + found = &jobs[i] + break + } + } + require.NotNil(t, found, "expected job %s in sync results", job.UUID) + assert.Equal(t, "/worktrees/feature-branch", found.WorktreePath) +} + +func TestUpsertPulledJob_PreservesWorktreePath(t *testing.T) { + db := openTestDB(t) + defer db.Close() + + repo, err := db.GetOrCreateRepo("/test/repo-wt-sync") + require.NoError(t, err) + + jobUUID := "test-uuid-wt-" + time.Now().Format("20060102150405") + + pulledJob := PulledJob{ + UUID: jobUUID, + RepoIdentity: "/test/repo-wt-sync", + GitRef: "HEAD", + Agent: "test-agent", + Status: "done", + WorktreePath: "/worktrees/my-branch", + SourceMachineID: "test-machine", + EnqueuedAt: time.Now(), + UpdatedAt: time.Now(), + } + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + require.NoError(t, err) + + // Verify worktree_path was stored + var wt string + err = db.QueryRow( + `SELECT COALESCE(worktree_path, '') FROM review_jobs WHERE uuid = ?`, jobUUID, + ).Scan(&wt) + require.NoError(t, err) + assert.Equal(t, "/worktrees/my-branch", wt) + + // Upsert with empty worktree_path should not clear existing value + pulledJob.WorktreePath = "" + err = db.UpsertPulledJob(pulledJob, repo.ID, nil) + require.NoError(t, err) + + err = db.QueryRow( + `SELECT COALESCE(worktree_path, '') FROM review_jobs WHERE uuid = ?`, jobUUID, + ).Scan(&wt) + require.NoError(t, err) + assert.Equal(t, "/worktrees/my-branch", wt) +}