Skip to content

feat: implement pull queue to prevent concurrent pulls#4

Closed
simonjcarr wants to merge 1 commit intomainfrom
claude/issue-3-20250923-2148
Closed

feat: implement pull queue to prevent concurrent pulls#4
simonjcarr wants to merge 1 commit intomainfrom
claude/issue-3-20250923-2148

Conversation

@simonjcarr
Copy link
Owner

Implements a queue mechanism to prevent concurrent pull operations and queue subsequent requests.

Changes

  • Added PullQueue structure with mutex-based synchronization
  • Queue pull requests when a pull is already in progress
  • Process queue sequentially, waiting for deployment scripts to complete
  • Prevent duplicate pull requests in the queue
  • Add tests for queue functionality

Fixes #3

Generated with Claude Code

- Added PullQueue structure with mutex-based synchronization
- Queue pull requests when a pull is already in progress
- Process queue sequentially, waiting for deployment scripts to complete
- Prevent duplicate pull requests in the queue
- Add tests for queue functionality

Fixes #3

Co-authored-by: Simon Carr <simonjcarr@users.noreply.github.com>
@claude
Copy link

claude bot commented Sep 23, 2025

Code Review: Pull Queue Implementation

Thank you for implementing the pull queue mechanism to prevent concurrent pulls. Here's my comprehensive review:

Strengths

  • Solid concurrency handling: Proper use of mutex and condition variables for thread-safe queue operations
  • Duplicate prevention: Good implementation to avoid queuing duplicate pull requests
  • Clean separation of concerns: Queue logic properly separated into its own file
  • Test coverage: Basic test coverage for queue functionality

🔍 Issues and Recommendations

1. Race Condition in Pull Queue Processing

Issue: The processPullQueue() goroutine has a potential race condition. When GetNext() returns false, it sleeps for 1 second, but new items added during this sleep won't be processed immediately.

Recommendation: Use condition variables more effectively:

func (m *MonitorV2) processPullQueue() {
    for {
        pr, ok := m.pullQueue.GetNext()
        if !ok {
            // GetNext should block internally using cond.Wait()
            // instead of returning false
            continue
        }
        m.executePull(pr.Repo, pr.RepoLogger)
        m.pullQueue.SetProcessing(false)
    }
}

2. Resource Leak: Goroutine Never Terminates

Issue: The processPullQueue() goroutine runs forever with no shutdown mechanism (internal/monitor_v2.go:32).

Recommendation: Add a shutdown channel:

type MonitorV2 struct {
    config    *Config
    pullQueue *PullQueue
    shutdown  chan struct{}
}

func (m *MonitorV2) processPullQueue() {
    for {
        select {
        case <-m.shutdown:
            return
        default:
            // process queue
        }
    }
}

3. Missing Error Recovery

Issue: If executePull panics, the entire queue processor dies and pulls stop working.

Recommendation: Add panic recovery:

func (m *MonitorV2) processPullQueue() {
    defer func() {
        if r := recover(); r != nil {
            logger.Error("Pull queue processor panic", zap.Any("panic", r))
            // Restart processor or handle gracefully
        }
    }()
    // ... rest of processing
}

4. Incorrect Condition Variable Usage

Issue: In GetNext() (internal/pull_queue.go:74), the condition wait loop could spuriously wake up.

Recommendation: Always use condition variables in a loop:

for pq.processing || len(pq.queue) == 0 {
    pq.cond.Wait()
}

5. Test Improvements Needed

Issues in pull_queue_test.go:

  • Test uses hardcoded timeouts (100ms) which could be flaky in CI
  • Missing test for concurrent Add operations
  • No test for panic recovery
  • No test for proper cleanup/shutdown

Recommendation: Add more robust tests:

func TestPullQueueConcurrentAdd(t *testing.T) {
    pq := NewPullQueue()
    var wg sync.WaitGroup
    
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            repo := Repository{
                URL: fmt.Sprintf("repo-%d", id),
            }
            pq.Add(repo, nil)
        }(i)
    }
    
    wg.Wait()
    if pq.Size() != 10 {
        t.Errorf("Expected 10 items, got %d", pq.Size())
    }
}

🔒 Security Considerations

  • ✅ No obvious security vulnerabilities
  • ✅ Proper mutex usage prevents race conditions
  • ⚠️ Consider adding max queue size to prevent memory exhaustion from malicious/broken webhook spam

🚀 Performance Considerations

  1. Queue could grow unbounded: Add a max queue size with appropriate handling
  2. Single-threaded processing: Consider if parallel processing of different repos is safe and beneficial
  3. Busy wait in GetNext: The condition variable implementation could be more efficient

📝 Minor Style Issues

  1. Missing newline at end of pull_queue.go and pull_queue_test.go files
  2. Consider adding queue metrics (depth, processing time) for monitoring

💡 Suggested Enhancements

  1. Add queue depth metrics/logging for observability
  2. Consider priority queue for critical repos
  3. Add configurable max queue size
  4. Implement graceful shutdown mechanism

✔️ Overall Assessment

The implementation successfully addresses the concurrent pull issue, but needs improvements in error handling, resource management, and test coverage before merging. The core queue logic is sound, but the goroutine lifecycle management and error recovery need attention.

Verdict: Request changes - please address the goroutine lifecycle and error handling issues before merging.

@simonjcarr simonjcarr closed this Sep 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant