Skip to content

refactor: make agent loop support parallel and update docs#2503

Merged
yinwm merged 1 commit intosipeed:mainfrom
cytown:loop
Apr 16, 2026
Merged

refactor: make agent loop support parallel and update docs#2503
yinwm merged 1 commit intosipeed:mainfrom
cytown:loop

Conversation

@cytown
Copy link
Copy Markdown
Contributor

@cytown cytown commented Apr 13, 2026

📝 Description

Base on PR#2481, please review after the PR was merged.

Refactored the AgentLoop in /Users/billy/github/picoclaw/pkg/agent/loop.go to support parallel processing of user messages with correct response routing. Here's what was implemented:

Key Changes

 1. Worker Pool Architecture (loop.go):
    - Added workerPoolSize and workerSem (semaphore) fields to AgentLoop struct
    - Added activeWorkers counter for monitoring
    - Workers are dispatched via goroutines with semaphore-based concurrency limiting

 2. Enhanced `Run()` Method (loop.go:468-543):
    - Replaced sequential message processing with worker pool dispatcher
    - Messages with different session keys are processed in parallel (up to MaxParallelTurns)
    - Messages with same session key are enqueued to steering queue (preserves conversation integrity)
    - System messages are processed immediately in synchronous mode

 3. New Helper Methods:
    - processMessageSync(): Handles non-routable messages synchronously
    - runTurnWithSteering(): Runs a complete turn and drains steering queue in worker context

 4. Configuration Support (config.go:262):
    - Added MaxParallelTurns field to AgentDefaults struct
    - Default value: 1 (preserves backward compatibility - sequential processing)
    - Environment variable: PICOCLAW_AGENTS_DEFAULTS_MAX_PARALLEL_TURNS

 5. Removed Obsolete Code:
    - Removed drainBusToSteering() - no longer needed with direct steering queue enrollment
    - Removed associated test TestDrainBusToSteering_RequeuesDifferentScopeMessage

 6. Comprehensive Tests (loop_test.go):
    - TestParallelMessageProcessing_DifferentSessionsProcessedConcurrently: Verifies parallel execution (achieved 3
      concurrent turns)
    - TestParallelMessageProcessing_SameSessionProcessedSequentially: Verifies same-session messages are queued

How It Works

  1 Inbound Message Flow:
  2 ┌─────────────────────────────────────────┐
  3 │  bus.InboundChan()                      │
  4 └──────────────┬──────────────────────────┘
  5                │
  6                ▼
  7 ┌─────────────────────────────────────────┐
  8 │  resolveSteeringTarget(msg)             │
  9 │  Returns: sessionKey, agentID, ok       │
 10 └──────────────┬──────────────────────────┘
 11                │
 12         ┌──────┴──────┐
 13         │             │
 14         ▼             ▼
 15    Not routable   Has session
 16    (system msg)   key
 17         │             │
 18         │             ├─► activeTurnStates.Load(sessionKey)?
 19         │             │         │
 20         │             │    Yes  │  No
 21         │             │         │     │
 22         ▼             │         ▼     ▼
 23  processMessageSync   │   Enqueue   Acquire workerSem
 24                       │   to        slot
 25                       │   steering  │
 26                       │   queue     ▼
 27                       │         runTurnWithSteering()
 28                       │         (processes message +
 29                       │          drains steering queue)

🗣️ Type of Change

  • 🐞 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • 📖 Documentation update
  • ⚡ Code refactoring (no functional changes, no api changes)

🤖 AI Code Generation

  • 🤖 Fully AI-generated (100% AI, 0% Human)
  • 🛠️ Mostly AI-generated (AI draft, Human verified/modified)
  • 👨‍💻 Mostly Human-written (Human lead, AI assisted or none)

🔗 Related Issue

📚 Technical Context (Skip for Docs)

  • Reference URL:
  • Reasoning:

🧪 Test Environment

  • Hardware:
  • OS:
  • Model/Provider:
  • Channels:

📸 Evidence (Optional)

Click to view Logs/Screenshots

☑️ Checklist

  • My code/docs follow the style of this project.
  • I have performed a self-review of my own changes.
  • I have updated the documentation accordingly.

Comment thread pkg/agent/loop.go Outdated
Comment thread pkg/agent/loop.go Outdated
Comment thread pkg/agent/loop.go
Comment thread pkg/agent/loop.go
Copy link
Copy Markdown
Collaborator

@yinwm yinwm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough refactor! The worker pool architecture is clean and well-documented. However, I found two critical issues with the shared placeholderTurnState singleton that need to be addressed before merging:

1. Worker placeholder cleanup can delete another worker's placeholder (breaks session serialization)

In loop.go:546-553, the safety-net defer checks ts.turnID == "pending" to detect a stale placeholder. Since placeholderTurnState is a package-level singleton shared across all sessions, Worker A's cleanup defer can accidentally delete Worker B's freshly-stored placeholder for the same session:

Worker A finishes → clearActiveTurn deletes session key
Run() main loop → LoadOrStore stores new placeholder → spawns Worker B
Worker A's cleanup defer → Load finds placeholder (Worker B's) → deletes it
Run() main loop → LoadOrStore succeeds again → spawns Worker C
→ Worker B and Worker C run concurrently for the same session ❌

Fix: Create a unique placeholder instance per LoadOrStore call (e.g., &turnState{turnID: "pending-" + sessionKey}), or use a unique ID to distinguish "my placeholder" from "someone else's placeholder" in the cleanup defer.

2. HardAbort/InterruptHard can mutate the shared placeholder singleton

HardAbort() (steering.go:489) does a type assertion tsInterface.(*turnState) which succeeds for placeholderTurnState since it's also *turnState. This calls ts.Finish(true) which permanently marks the global singleton as finished, affecting all future sessions that use it. Similarly, InterruptHard() via getAnyActiveTurnState() can call requestHardAbort() on the placeholder.

Fix: Add a guard in HardAbort and InterruptHard:

if ts.turnID == "pending" {
    return fmt.Errorf("turn is still initializing for session %s", sessionKey)
}

Additional non-blocking suggestions

  • Continue() placeholder handling: In steering.go:355-357, GetActiveTurnBySession returns non-nil for placeholder (turnID="pending"), causing Continue() to return an error instead of gracefully yielding. This can prematurely break the steering drain loop in runTurnWithSteering.
  • sentTargets memory leak: message.go:68ResetSentInRound truncates the slice but never deletes the map key. Over time with many unique sessions, the map grows unbounded.
  • Dead code: drainBusToSteering is no longer called from Run() (only referenced in steering_test.go).

These issues don't manifest when max_parallel_turns=1 (default), but will break session serialization when parallelism is enabled.

@cytown
Copy link
Copy Markdown
Contributor Author

cytown commented Apr 16, 2026

Thanks for the thorough refactor! The worker pool architecture is clean and well-documented. However, I found two critical issues with the shared placeholderTurnState singleton that need to be addressed before merging:

1. Worker placeholder cleanup can delete another worker's placeholder (breaks session serialization)

In loop.go:546-553, the safety-net defer checks ts.turnID == "pending" to detect a stale placeholder. Since placeholderTurnState is a package-level singleton shared across all sessions, Worker A's cleanup defer can accidentally delete Worker B's freshly-stored placeholder for the same session:

Worker A finishes → clearActiveTurn deletes session key
Run() main loop → LoadOrStore stores new placeholder → spawns Worker B
Worker A's cleanup defer → Load finds placeholder (Worker B's) → deletes it
Run() main loop → LoadOrStore succeeds again → spawns Worker C
→ Worker B and Worker C run concurrently for the same session ❌

Fix: Create a unique placeholder instance per LoadOrStore call (e.g., &turnState{turnID: "pending-" + sessionKey}), or use a unique ID to distinguish "my placeholder" from "someone else's placeholder" in the cleanup defer.

2. HardAbort/InterruptHard can mutate the shared placeholder singleton

HardAbort() (steering.go:489) does a type assertion tsInterface.(*turnState) which succeeds for placeholderTurnState since it's also *turnState. This calls ts.Finish(true) which permanently marks the global singleton as finished, affecting all future sessions that use it. Similarly, InterruptHard() via getAnyActiveTurnState() can call requestHardAbort() on the placeholder.

Fix: Add a guard in HardAbort and InterruptHard:

if ts.turnID == "pending" {
    return fmt.Errorf("turn is still initializing for session %s", sessionKey)
}

Additional non-blocking suggestions

  • Continue() placeholder handling: In steering.go:355-357, GetActiveTurnBySession returns non-nil for placeholder (turnID="pending"), causing Continue() to return an error instead of gracefully yielding. This can prematurely break the steering drain loop in runTurnWithSteering.
  • sentTargets memory leak: message.go:68ResetSentInRound truncates the slice but never deletes the map key. Over time with many unique sessions, the map grows unbounded.
  • Dead code: drainBusToSteering is no longer called from Run() (only referenced in steering_test.go).

These issues don't manifest when max_parallel_turns=1 (default), but will break session serialization when parallelism is enabled.

all done.

Copy link
Copy Markdown
Collaborator

@yinwm yinwm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Both CRITICAL issues from the first round are properly fixed:

  • Placeholder is now a unique per-claim instance (makePendingTurnID with sequence number)
  • HardAbort guards against pending turns with prefix check
  • sentTargets memory leak fixed with delete()
  • Dead code (drainBusToSteering, requeueInboundMessage) cleaned up

The extra cleanup (vision retry inline, code reorganization) is a nice bonus.

@yinwm yinwm merged commit eb24269 into sipeed:main Apr 16, 2026
4 checks passed
@cytown cytown deleted the loop branch April 16, 2026 16:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants