Skip to content

Commit c114248

Browse files
authored
Merge pull request #183 from CaitlynByrne/feat/pause-drain
feat: add graceful pause (drain mode) for running agents
2 parents 76dd4b8 + 9721368 commit c114248

12 files changed

Lines changed: 311 additions & 24 deletions

File tree

autoforge_paths.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
assistant.db-shm
4040
.agent.lock
4141
.devserver.lock
42+
.pause_drain
4243
.claude_settings.json
4344
.claude_assistant_settings.json
4445
.claude_settings.expand.*.json
@@ -146,6 +147,15 @@ def get_claude_assistant_settings_path(project_dir: Path) -> Path:
146147
return _resolve_path(project_dir, ".claude_assistant_settings.json")
147148

148149

150+
def get_pause_drain_path(project_dir: Path) -> Path:
151+
"""Return the path to the ``.pause_drain`` signal file.
152+
153+
This file is created to request a graceful pause (drain mode).
154+
Always uses the new location since it's a transient signal file.
155+
"""
156+
return project_dir / ".autoforge" / ".pause_drain"
157+
158+
149159
def get_progress_cache_path(project_dir: Path) -> Path:
150160
"""Resolve the path to ``.progress_cache``."""
151161
return _resolve_path(project_dir, ".progress_cache")

parallel_orchestrator.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ def __init__(
213213
# Signal handlers only set this flag; cleanup happens in the main loop
214214
self._shutdown_requested = False
215215

216+
# Graceful pause (drain mode) flag
217+
self._drain_requested = False
218+
216219
# Session tracking for logging/debugging
217220
self.session_start_time: datetime | None = None
218221

@@ -1387,6 +1390,9 @@ async def run_loop(self):
13871390
# Must happen before any debug_log.log() calls
13881391
debug_log.start_session()
13891392

1393+
# Clear any stale drain signal from a previous session
1394+
self._clear_drain_signal()
1395+
13901396
# Log startup to debug file
13911397
debug_log.section("ORCHESTRATOR STARTUP")
13921398
debug_log.log("STARTUP", "Orchestrator run_loop starting",
@@ -1508,6 +1514,34 @@ async def run_loop(self):
15081514
print("\nAll features complete!", flush=True)
15091515
break
15101516

1517+
# --- Graceful pause (drain mode) ---
1518+
if not self._drain_requested and self._check_drain_signal():
1519+
self._drain_requested = True
1520+
print("Graceful pause requested - draining running agents...", flush=True)
1521+
debug_log.log("DRAIN", "Graceful pause requested, draining running agents")
1522+
1523+
if self._drain_requested:
1524+
with self._lock:
1525+
coding_count = len(self.running_coding_agents)
1526+
testing_count = len(self.running_testing_agents)
1527+
1528+
if coding_count == 0 and testing_count == 0:
1529+
print("All agents drained - paused.", flush=True)
1530+
debug_log.log("DRAIN", "All agents drained, entering paused state")
1531+
# Wait until signal file is removed (resume) or shutdown
1532+
while self._check_drain_signal() and self.is_running and not self._shutdown_requested:
1533+
await asyncio.sleep(1)
1534+
if not self.is_running or self._shutdown_requested:
1535+
break
1536+
self._drain_requested = False
1537+
print("Resuming from graceful pause...", flush=True)
1538+
debug_log.log("DRAIN", "Resuming from graceful pause")
1539+
continue
1540+
else:
1541+
debug_log.log("DRAIN", f"Waiting for agents to finish: coding={coding_count}, testing={testing_count}")
1542+
await self._wait_for_agent_completion()
1543+
continue
1544+
15111545
# Maintain testing agents independently (runs every iteration)
15121546
self._maintain_testing_agents(feature_dicts)
15131547

@@ -1632,6 +1666,17 @@ def get_status(self) -> dict:
16321666
"yolo_mode": self.yolo_mode,
16331667
}
16341668

1669+
def _check_drain_signal(self) -> bool:
1670+
"""Check if the graceful pause (drain) signal file exists."""
1671+
from autoforge_paths import get_pause_drain_path
1672+
return get_pause_drain_path(self.project_dir).exists()
1673+
1674+
def _clear_drain_signal(self) -> None:
1675+
"""Delete the drain signal file and reset the flag."""
1676+
from autoforge_paths import get_pause_drain_path
1677+
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
1678+
self._drain_requested = False
1679+
16351680
def cleanup(self) -> None:
16361681
"""Clean up database resources. Safe to call multiple times.
16371682

server/routers/agent.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,31 @@ async def resume_agent(project_name: str):
175175
status=manager.status,
176176
message=message,
177177
)
178+
179+
180+
@router.post("/graceful-pause", response_model=AgentActionResponse)
181+
async def graceful_pause_agent(project_name: str):
182+
"""Request a graceful pause (drain mode) - finish current work then pause."""
183+
manager = get_project_manager(project_name)
184+
185+
success, message = await manager.graceful_pause()
186+
187+
return AgentActionResponse(
188+
success=success,
189+
status=manager.status,
190+
message=message,
191+
)
192+
193+
194+
@router.post("/graceful-resume", response_model=AgentActionResponse)
195+
async def graceful_resume_agent(project_name: str):
196+
"""Resume from a graceful pause."""
197+
manager = get_project_manager(project_name)
198+
199+
success, message = await manager.graceful_resume()
200+
201+
return AgentActionResponse(
202+
success=success,
203+
status=manager.status,
204+
message=message,
205+
)

server/schemas.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def validate_testing_ratio(cls, v: int | None) -> int | None:
217217

218218
class AgentStatus(BaseModel):
219219
"""Current agent status."""
220-
status: Literal["stopped", "running", "paused", "crashed"]
220+
status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]
221221
pid: int | None = None
222222
started_at: datetime | None = None
223223
yolo_mode: bool = False

server/services/process_manager.py

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(
7777
self.project_dir = project_dir
7878
self.root_dir = root_dir
7979
self.process: subprocess.Popen | None = None
80-
self._status: Literal["stopped", "running", "paused", "crashed"] = "stopped"
80+
self._status: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"] = "stopped"
8181
self.started_at: datetime | None = None
8282
self._output_task: asyncio.Task | None = None
8383
self.yolo_mode: bool = False # YOLO mode for rapid prototyping
@@ -96,11 +96,11 @@ def __init__(
9696
self.lock_file = get_agent_lock_path(self.project_dir)
9797

9898
@property
99-
def status(self) -> Literal["stopped", "running", "paused", "crashed"]:
99+
def status(self) -> Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]:
100100
return self._status
101101

102102
@status.setter
103-
def status(self, value: Literal["stopped", "running", "paused", "crashed"]):
103+
def status(self, value: Literal["stopped", "running", "paused", "crashed", "pausing", "paused_graceful"]):
104104
old_status = self._status
105105
self._status = value
106106
if old_status != value:
@@ -330,6 +330,12 @@ async def _stream_output(self) -> None:
330330
for help_line in AUTH_ERROR_HELP.strip().split('\n'):
331331
await self._broadcast_output(help_line)
332332

333+
# Detect graceful pause status transitions from orchestrator output
334+
if "All agents drained - paused." in decoded:
335+
self.status = "paused_graceful"
336+
elif "Resuming from graceful pause..." in decoded:
337+
self.status = "running"
338+
333339
await self._broadcast_output(sanitized)
334340

335341
except asyncio.CancelledError:
@@ -377,7 +383,7 @@ async def start(
377383
Returns:
378384
Tuple of (success, message)
379385
"""
380-
if self.status in ("running", "paused"):
386+
if self.status in ("running", "paused", "pausing", "paused_graceful"):
381387
return False, f"Agent is already {self.status}"
382388

383389
if not self._check_lock():
@@ -526,6 +532,12 @@ async def stop(self) -> tuple[bool, str]:
526532

527533
self._cleanup_stale_features()
528534
self._remove_lock()
535+
# Clean up drain signal file if present
536+
try:
537+
from autoforge_paths import get_pause_drain_path
538+
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
539+
except Exception:
540+
pass
529541
self.status = "stopped"
530542
self.process = None
531543
self.started_at = None
@@ -586,6 +598,47 @@ async def resume(self) -> tuple[bool, str]:
586598
logger.exception("Failed to resume agent")
587599
return False, f"Failed to resume agent: {e}"
588600

601+
async def graceful_pause(self) -> tuple[bool, str]:
602+
"""Request a graceful pause (drain mode).
603+
604+
Creates a signal file that the orchestrator polls. Running agents
605+
finish their current work before the orchestrator enters a paused state.
606+
607+
Returns:
608+
Tuple of (success, message)
609+
"""
610+
if not self.process or self.status not in ("running",):
611+
return False, "Agent is not running"
612+
613+
try:
614+
from autoforge_paths import get_pause_drain_path
615+
drain_path = get_pause_drain_path(self.project_dir)
616+
drain_path.parent.mkdir(parents=True, exist_ok=True)
617+
drain_path.write_text(str(self.process.pid))
618+
self.status = "pausing"
619+
return True, "Graceful pause requested"
620+
except Exception as e:
621+
logger.exception("Failed to request graceful pause")
622+
return False, f"Failed to request graceful pause: {e}"
623+
624+
async def graceful_resume(self) -> tuple[bool, str]:
625+
"""Resume from a graceful pause by removing the drain signal file.
626+
627+
Returns:
628+
Tuple of (success, message)
629+
"""
630+
if not self.process or self.status not in ("pausing", "paused_graceful"):
631+
return False, "Agent is not in a graceful pause state"
632+
633+
try:
634+
from autoforge_paths import get_pause_drain_path
635+
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
636+
self.status = "running"
637+
return True, "Agent resumed from graceful pause"
638+
except Exception as e:
639+
logger.exception("Failed to resume from graceful pause")
640+
return False, f"Failed to resume: {e}"
641+
589642
async def healthcheck(self) -> bool:
590643
"""
591644
Check if the agent process is still alive.
@@ -601,8 +654,14 @@ async def healthcheck(self) -> bool:
601654
poll = self.process.poll()
602655
if poll is not None:
603656
# Process has terminated
604-
if self.status in ("running", "paused"):
657+
if self.status in ("running", "paused", "pausing", "paused_graceful"):
605658
self._cleanup_stale_features()
659+
# Clean up drain signal file if present
660+
try:
661+
from autoforge_paths import get_pause_drain_path
662+
get_pause_drain_path(self.project_dir).unlink(missing_ok=True)
663+
except Exception:
664+
pass
606665
self.status = "crashed"
607666
self._remove_lock()
608667
return False
@@ -687,8 +746,14 @@ def cleanup_orphaned_locks() -> int:
687746
if not project_path.exists():
688747
continue
689748

749+
# Clean up stale drain signal files
750+
from autoforge_paths import get_autoforge_dir, get_pause_drain_path
751+
drain_file = get_pause_drain_path(project_path)
752+
if drain_file.exists():
753+
drain_file.unlink(missing_ok=True)
754+
logger.info("Removed stale drain signal file for project '%s'", name)
755+
690756
# Check both legacy and new locations for lock files
691-
from autoforge_paths import get_autoforge_dir
692757
lock_locations = [
693758
project_path / ".agent.lock",
694759
get_autoforge_dir(project_path) / ".agent.lock",

server/websocket.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@
7878
'testing_complete': re.compile(r'Feature #(\d+) testing (completed|failed)'),
7979
'all_complete': re.compile(r'All features complete'),
8080
'blocked_features': re.compile(r'(\d+) blocked by dependencies'),
81+
'drain_start': re.compile(r'Graceful pause requested'),
82+
'drain_complete': re.compile(r'All agents drained'),
83+
'drain_resume': re.compile(r'Resuming from graceful pause'),
8184
}
8285

8386

@@ -562,6 +565,30 @@ async def process_line(self, line: str) -> dict | None:
562565
'All features complete!'
563566
)
564567

568+
# Graceful pause (drain mode) events
569+
elif ORCHESTRATOR_PATTERNS['drain_start'].search(line):
570+
self.state = 'draining'
571+
update = self._create_update(
572+
'drain_start',
573+
'Draining active agents...'
574+
)
575+
576+
elif ORCHESTRATOR_PATTERNS['drain_complete'].search(line):
577+
self.state = 'paused'
578+
self.coding_agents = 0
579+
self.testing_agents = 0
580+
update = self._create_update(
581+
'drain_complete',
582+
'All agents drained. Paused.'
583+
)
584+
585+
elif ORCHESTRATOR_PATTERNS['drain_resume'].search(line):
586+
self.state = 'scheduling'
587+
update = self._create_update(
588+
'drain_resume',
589+
'Resuming feature scheduling'
590+
)
591+
565592
return update
566593

567594
def _create_update(

0 commit comments

Comments
 (0)