feat: implement pull queue to prevent concurrent pulls#4
feat: implement pull queue to prevent concurrent pulls#4simonjcarr wants to merge 1 commit intomainfrom
Conversation
- Added PullQueue structure with mutex-based synchronization - Queue pull requests when a pull is already in progress - Process queue sequentially, waiting for deployment scripts to complete - Prevent duplicate pull requests in the queue - Add tests for queue functionality Fixes #3 Co-authored-by: Simon Carr <simonjcarr@users.noreply.github.com>
Code Review: Pull Queue ImplementationThank you for implementing the pull queue mechanism to prevent concurrent pulls. Here's my comprehensive review: ✅ Strengths
🔍 Issues and Recommendations1. Race Condition in Pull Queue ProcessingIssue: The Recommendation: Use condition variables more effectively: func (m *MonitorV2) processPullQueue() {
for {
pr, ok := m.pullQueue.GetNext()
if !ok {
// GetNext should block internally using cond.Wait()
// instead of returning false
continue
}
m.executePull(pr.Repo, pr.RepoLogger)
m.pullQueue.SetProcessing(false)
}
}2. Resource Leak: Goroutine Never TerminatesIssue: The Recommendation: Add a shutdown channel: type MonitorV2 struct {
config *Config
pullQueue *PullQueue
shutdown chan struct{}
}
func (m *MonitorV2) processPullQueue() {
for {
select {
case <-m.shutdown:
return
default:
// process queue
}
}
}3. Missing Error RecoveryIssue: If Recommendation: Add panic recovery: func (m *MonitorV2) processPullQueue() {
defer func() {
if r := recover(); r != nil {
logger.Error("Pull queue processor panic", zap.Any("panic", r))
// Restart processor or handle gracefully
}
}()
// ... rest of processing
}4. Incorrect Condition Variable UsageIssue: In Recommendation: Always use condition variables in a loop: for pq.processing || len(pq.queue) == 0 {
pq.cond.Wait()
}5. Test Improvements NeededIssues in pull_queue_test.go:
Recommendation: Add more robust tests: func TestPullQueueConcurrentAdd(t *testing.T) {
pq := NewPullQueue()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
repo := Repository{
URL: fmt.Sprintf("repo-%d", id),
}
pq.Add(repo, nil)
}(i)
}
wg.Wait()
if pq.Size() != 10 {
t.Errorf("Expected 10 items, got %d", pq.Size())
}
}🔒 Security Considerations
🚀 Performance Considerations
📝 Minor Style Issues
💡 Suggested Enhancements
✔️ Overall AssessmentThe implementation successfully addresses the concurrent pull issue, but needs improvements in error handling, resource management, and test coverage before merging. The core queue logic is sound, but the goroutine lifecycle management and error recovery need attention. Verdict: Request changes - please address the goroutine lifecycle and error handling issues before merging. |
Implements a queue mechanism to prevent concurrent pull operations and queue subsequent requests.
Changes
Fixes #3
Generated with Claude Code