Skip to content

Commit ac973af

Browse files
authored
Merge pull request #491 from PlanExeOrg/split-run-plan-pipeline
Split run_plan_pipeline.py into individual stage files
2 parents 1ed8bee + 75e5f29 commit ac973af

69 files changed

Lines changed: 5986 additions & 3763 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/proposals/131-codebase-cleanliness-remediation-roadmap.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ Large modules make the code harder to reason about, harder to test in isolation,
7070

7171
1. ~~Split `frontend_multi_user/src/app.py` by concern into `auth`, `billing`, `admin`, `downloads`, `account`, and `plan_routes`.~~ **Done** (PR #476): Split 3,857-line monolith into 6 Flask Blueprint modules + utils (app.py reduced to 1,441 lines). Follow-up fix: updated all `url_for()` calls in templates to use blueprint-prefixed endpoint names (`plan_routes.*`, `auth.*`, `downloads.*`).
7272
2. ~~Split `mcp_cloud/http_server.py` into `middleware`, `route_registration`, `tool_http_bridge`, and `server_boot`.~~ **Done**: Split 1,439-line monolith into 4 focused modules + re-export shim.
73-
3. Convert `worker_plan/worker_plan_internal/plan/run_plan_pipeline.py` from a giant task registry file into a thin pipeline assembly module plus task-specific modules grouped by stage.
73+
3. ~~Convert `worker_plan/worker_plan_internal/plan/run_plan_pipeline.py` from a giant task registry file into a thin pipeline assembly module plus task-specific modules grouped by stage.~~ **Done**: Split 4,257-line monolith into ~66 individual stage files under `stages/` + framework-only core module (563 lines).
7474
4. Extract reusable orchestration helpers from `worker_plan_database/app.py` into focused worker, billing, and queue modules.
7575
5. Set an internal size target for service modules. As a starting rule, new files should stay below roughly 500 lines unless there is a strong reason not to.
7676

docs/superpowers/plans/2026-04-02-split-run-plan-pipeline.md

Lines changed: 1075 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
# Split run_plan_pipeline.py — Design Spec
2+
3+
**Date:** 2026-04-02
4+
**Status:** Approved
5+
**Tags:** `worker_plan`, `refactor`, `maintainability`, `agent-isolation`
6+
7+
---
8+
9+
## Goal
10+
11+
Split `worker_plan/worker_plan_internal/plan/run_plan_pipeline.py` (4,257 lines) into ~66 individual stage files plus a slim core module, optimizing for parallel agent work, `self_improve/` integration, and conflict-free DAG insertion.
12+
13+
## Problem
14+
15+
`run_plan_pipeline.py` contains ~66 Luigi task classes, the `PlanTask` base class, `ExecutePipeline`, `FullPlanPipeline`, and supporting utilities — all in one file. This means:
16+
17+
1. Two agents working on different pipeline steps must edit the same file, causing merge conflicts.
18+
2. `self_improve/` cannot target an individual step file — it must reason about a 4,257-line module.
19+
3. Inserting a new task in the DAG requires editing the giant file, risking unrelated merge conflicts with other in-flight work.
20+
21+
## Design Priorities
22+
23+
These priorities (from the user) override backward-compatibility concerns:
24+
25+
1. **Agent isolation** — each pipeline step in its own file so agents never conflict.
26+
2. **Easy DAG insertion** — adding a new task between two existing tasks touches only 2 files (the new file and the downstream file's `requires()`), never a central registry.
27+
3. **`self_improve/` addressability** — each step is an individually importable module.
28+
29+
## Approach
30+
31+
Extract each task class into its own file under a new `stages/` directory. Each file declares its own Luigi `requires()` dependencies by importing its upstream task(s). `run_plan_pipeline.py` keeps only the shared framework (`PlanTask`, `ExecutePipeline`, etc.).
32+
33+
## New Module Layout
34+
35+
### `run_plan_pipeline.py` (~280 lines, slimmed core)
36+
37+
Keeps:
38+
- All imports needed by the framework classes
39+
- `PlanTask` base class (lines 105–215)
40+
- `_task_class_to_step_label` utility (lines 3851–3862)
41+
- `PipelineProgress` dataclass (lines 3866–3871)
42+
- `HandleTaskCompletionParameters` dataclass (lines 3875–3878)
43+
- `ExecutePipeline` dataclass (lines 3882–4107)
44+
- `DemoStoppingExecutePipeline` (lines 4109–4116)
45+
- `configure_logging` (lines 4119–4148)
46+
- `__main__` block (lines 4152–4258)
47+
- Module-level constants: `logger`, `DEFAULT_LLM_MODEL`, `REPORT_EXECUTE_PLAN_SECTION_HIDDEN`
48+
49+
Changes:
50+
- `ExecutePipeline.setup()` imports `FullPlanPipeline` from `stages.full_plan_pipeline` instead of referencing it as a module-level class.
51+
- The `__main__` block's `DemoStoppingExecutePipeline` reference stays (it's in the same file).
52+
- All ~66 task class definitions and their imports are removed.
53+
54+
### `stages/` directory
55+
56+
Each file contains exactly one task class (except where noted). Files follow this pattern:
57+
58+
```python
59+
"""Pipeline stage: <one-line description>."""
60+
import logging
61+
from worker_plan_internal.plan.run_plan_pipeline import PlanTask
62+
from worker_plan_api.filenames import FilenameEnum
63+
# ... executor imports specific to this stage
64+
# ... upstream task imports from sibling stage files
65+
66+
logger = logging.getLogger(__name__)
67+
68+
class FooTask(PlanTask):
69+
def requires(self):
70+
return self.clone(UpstreamTask)
71+
72+
def output(self):
73+
return self.local_target(FilenameEnum.FOO)
74+
75+
def run_with_llm(self, llm):
76+
# ... verbatim from current code
77+
```
78+
79+
### `stages/__init__.py`
80+
81+
Empty file. Stage files are imported directly by whoever needs them (other stages, `FullPlanPipeline`).
82+
83+
### `stages/full_plan_pipeline.py`
84+
85+
Contains `FullPlanPipeline` which imports all task classes and lists them in `requires()`. This is the only file that imports every stage — it's the DAG root.
86+
87+
**Note:** `FullPlanPipeline.requires()` currently lists ALL tasks explicitly (not just leaves). This is preserved verbatim to maintain identical Luigi behavior. Optimizing to leaf-only listing is a separate future change.
88+
89+
### Complete file list
90+
91+
```
92+
stages/
93+
├── __init__.py
94+
├── full_plan_pipeline.py
95+
96+
│ # Phase 1: Input & Validation
97+
├── start_time.py (StartTimeTask)
98+
├── setup.py (SetupTask)
99+
├── redline_gate.py (RedlineGateTask)
100+
├── premise_attack.py (PremiseAttackTask)
101+
102+
│ # Phase 2: Purpose & Classification
103+
├── identify_purpose.py (IdentifyPurposeTask)
104+
├── plan_type.py (PlanTypeTask)
105+
106+
│ # Phase 3: Strategic Options & Scenarios
107+
├── potential_levers.py (PotentialLeversTask)
108+
├── deduplicate_levers.py (DeduplicateLeversTask)
109+
├── enrich_levers.py (EnrichLeversTask)
110+
├── focus_on_vital_few_levers.py (FocusOnVitalFewLeversTask)
111+
├── strategic_decisions_markdown.py (StrategicDecisionsMarkdownTask)
112+
├── candidate_scenarios.py (CandidateScenariosTask)
113+
├── select_scenario.py (SelectScenarioTask)
114+
├── scenarios_markdown.py (ScenariosMarkdownTask)
115+
116+
│ # Phase 4: Context
117+
├── physical_locations.py (PhysicalLocationsTask)
118+
├── currency_strategy.py (CurrencyStrategyTask)
119+
├── identify_risks.py (IdentifyRisksTask)
120+
121+
│ # Phase 5: Assumptions
122+
├── make_assumptions.py (MakeAssumptionsTask)
123+
├── distill_assumptions.py (DistillAssumptionsTask)
124+
├── review_assumptions.py (ReviewAssumptionsTask)
125+
├── consolidate_assumptions_markdown.py (ConsolidateAssumptionsMarkdownTask)
126+
127+
│ # Phase 6: Plan Foundation
128+
├── pre_project_assessment.py (PreProjectAssessmentTask)
129+
├── project_plan.py (ProjectPlanTask)
130+
131+
│ # Phase 7: Governance
132+
├── governance_phase1_audit.py (GovernancePhase1AuditTask)
133+
├── governance_phase2_bodies.py (GovernancePhase2BodiesTask)
134+
├── governance_phase3_impl_plan.py (GovernancePhase3ImplPlanTask)
135+
├── governance_phase4_decision_escalation_matrix.py (GovernancePhase4DecisionEscalationMatrixTask)
136+
├── governance_phase5_monitoring_progress.py (GovernancePhase5MonitoringProgressTask)
137+
├── governance_phase6_extra.py (GovernancePhase6ExtraTask)
138+
├── consolidate_governance.py (ConsolidateGovernanceTask)
139+
140+
│ # Phase 8: Team
141+
├── related_resources.py (RelatedResourcesTask)
142+
├── find_team_members.py (FindTeamMembersTask)
143+
├── enrich_team_contract_type.py (EnrichTeamMembersWithContractTypeTask)
144+
├── enrich_team_background_story.py (EnrichTeamMembersWithBackgroundStoryTask)
145+
├── enrich_team_environment_info.py (EnrichTeamMembersWithEnvironmentInfoTask)
146+
├── review_team.py (ReviewTeamTask)
147+
├── team_markdown.py (TeamMarkdownTask)
148+
149+
│ # Phase 9: Analysis
150+
├── swot_analysis.py (SWOTAnalysisTask)
151+
├── expert_review.py (ExpertReviewTask)
152+
153+
│ # Phase 10: Documents
154+
├── data_collection.py (DataCollectionTask)
155+
├── identify_documents.py (IdentifyDocumentsTask)
156+
├── filter_documents_to_find.py (FilterDocumentsToFindTask)
157+
├── filter_documents_to_create.py (FilterDocumentsToCreateTask)
158+
├── draft_documents_to_find.py (DraftDocumentsToFindTask)
159+
├── draft_documents_to_create.py (DraftDocumentsToCreateTask)
160+
├── markdown_documents.py (MarkdownWithDocumentsToCreateAndFindTask)
161+
162+
│ # Phase 11: WBS & Pitch
163+
├── create_wbs_level1.py (CreateWBSLevel1Task)
164+
├── create_wbs_level2.py (CreateWBSLevel2Task)
165+
├── wbs_project_level1_and_level2.py (WBSProjectLevel1AndLevel2Task)
166+
├── create_pitch.py (CreatePitchTask)
167+
├── convert_pitch_to_markdown.py (ConvertPitchToMarkdownTask)
168+
├── identify_task_dependencies.py (IdentifyTaskDependenciesTask)
169+
├── estimate_task_durations.py (EstimateTaskDurationsTask)
170+
├── create_wbs_level3.py (CreateWBSLevel3Task)
171+
├── wbs_project_level1_level2_level3.py (WBSProjectLevel1AndLevel2AndLevel3Task)
172+
173+
│ # Phase 12: Schedule & Final Review
174+
├── create_schedule.py (CreateScheduleTask)
175+
├── review_plan.py (ReviewPlanTask)
176+
├── executive_summary.py (ExecutiveSummaryTask)
177+
├── questions_and_answers.py (QuestionsAndAnswersTask)
178+
├── premortem.py (PremortemTask)
179+
├── self_audit.py (SelfAuditTask)
180+
181+
│ # Phase 13: Report
182+
└── report.py (ReportTask)
183+
```
184+
185+
## DAG Insertion Example
186+
187+
To insert a new `ValidateBudgetTask` between `CurrencyStrategyTask` and `IdentifyRisksTask`:
188+
189+
1. Create `stages/validate_budget.py`:
190+
```python
191+
from worker_plan_internal.plan.stages.currency_strategy import CurrencyStrategyTask
192+
193+
class ValidateBudgetTask(PlanTask):
194+
def requires(self):
195+
return self.clone(CurrencyStrategyTask)
196+
# ...
197+
```
198+
199+
2. Edit `stages/identify_risks.py` — change its `requires()` to depend on `ValidateBudgetTask` instead of `CurrencyStrategyTask`.
200+
201+
3. Add one line to `stages/full_plan_pipeline.py`'s `requires()` dict.
202+
203+
Only 3 files touched. No other stage files are affected.
204+
205+
## What Does NOT Change
206+
207+
- No behavioral changes — identical DAG, identical task execution, identical output files
208+
- No new dependencies
209+
- No changes to Luigi task parameters, output filenames, or callback mechanisms
210+
- No changes to `worker_plan_database/app.py` (imports `ExecutePipeline` from `run_plan_pipeline`, which stays)
211+
- No changes to `test_step_label.py` (imports `_task_class_to_step_label` from `run_plan_pipeline`, which stays)
212+
- The `__main__` entry point still works: `python -m worker_plan_internal.plan.run_plan_pipeline`
213+
214+
## Code Movement Rules
215+
216+
1. **Verbatim extraction** — task class bodies are moved exactly as-is, no logic changes.
217+
2. **Each file gets only the imports it needs** — no bulk import block copied to every file.
218+
3. **The `REPORT_EXECUTE_PLAN_SECTION_HIDDEN` constant** stays in `run_plan_pipeline.py` since it's used by `ReportTask`. `stages/report.py` imports it from there.
219+
220+
## AGENTS.md Update
221+
222+
`worker_plan/AGENTS.md` will be updated to document:
223+
- The `stages/` directory and the one-file-per-task convention
224+
- How to add a new pipeline stage (create file, wire `requires()`, add to `FullPlanPipeline`)
225+
- That `run_plan_pipeline.py` contains only shared framework, not task implementations
226+
227+
## Risks
228+
229+
1. **Circular imports**: `stages/*.py` files import `PlanTask` from `run_plan_pipeline.py`. `run_plan_pipeline.py` imports `FullPlanPipeline` from `stages/full_plan_pipeline.py` (only inside `ExecutePipeline.setup()`, not at module level). No circularity because the cross-import is deferred to method call time.
230+
231+
2. **Luigi task discovery**: Luigi resolves tasks via `requires()` chains starting from `FullPlanPipeline`. Since each stage file imports its upstream tasks, all task classes are loaded when `FullPlanPipeline` is imported. No registration mechanism needed.
232+
233+
3. **`self_improve/` runner compatibility**: The runner imports and executes step source files directly. After the split, individual step executor classes (like `IdentifyPotentialLevers`) still live in their original locations (`worker_plan_internal/lever/`). The Luigi task wrappers move to `stages/`, but `self_improve/` doesn't import those — it imports the executor classes directly. No changes needed.

worker_plan/AGENTS.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,29 @@ consumers.
5050
`rate_limit`, etc.) stored in `usage_metrics.jsonl`. Unknown errors preserve
5151
a truncated `error_detail` field.
5252

53+
## Pipeline Stages (`worker_plan_internal/plan/stages/`)
54+
55+
Each Luigi pipeline task lives in its own file under `stages/`. This enables:
56+
- Multiple agents working on different stages without merge conflicts
57+
- `self_improve/` targeting individual step files
58+
- Easy DAG insertion (create new file, update downstream `requires()`)
59+
60+
### Convention for new stages
61+
62+
1. Create `stages/<stage_name>.py` with one task class
63+
2. Import `PlanTask` from `worker_plan_internal.plan.run_plan_pipeline`
64+
3. Import upstream task dependencies from sibling stage files
65+
4. Declare dependencies via `requires()` returning upstream task(s)
66+
5. Add the new task to `stages/full_plan_pipeline.py`'s `requires()` dict
67+
68+
### Framework location
69+
70+
`run_plan_pipeline.py` contains only shared framework:
71+
- `PlanTask` (base class for all stages)
72+
- `ExecutePipeline`, `HandleTaskCompletionParameters`, `PipelineProgress`
73+
- `_task_class_to_step_label`, `configure_logging`
74+
- `__main__` entry point
75+
5376
## Testing
5477
- Prefer unit tests over manual server checks. Run `python test.py` from repo
5578
root; worker tests live under `worker_plan/worker_plan_internal/**/tests` and

0 commit comments

Comments
 (0)