Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .Jules/bolt.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
## 2024-03-28 - Initial Bolt Run | Learning: The codebase is primarily Python, not TS/JS/Astro | Action: Adjust search queries to look for Python performance hotspots, such as nested loops, N+1 queries, unnecessary list comprehensions, or missing caching/memoization.
## 2024-03-28 - Optimize list comprehensions | Learning: Nested or repeated list comprehensions on large arrays cause unnecessary overhead in Python | Action: Consolidate repeated iterations into a single O(N) loop.
## 2025-02-28 - Async Orchestration Loop Refactoring | Learning: Replacing blocking synchronous OS thread sleeps with native asyncio sleeping inside the orchestrator loop improves performance by eliminating an unnecessary thread block and allowing concurrent scheduling of other tasks on the main event loop. | Action: Ensure orchestration logic running in the background uses `async def` and `await asyncio.sleep` over `threading.Thread` with `time.sleep` when interacting directly with async dependencies and queue management.
64 changes: 42 additions & 22 deletions infrastructure/distributed/resume_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,32 @@ def start(self):
self.resume_engine.start()
self.orchestrator_active = True

# Start orchestration thread
self.orchestrator_thread = threading.Thread(
target=self._orchestration_loop, daemon=True
)
self.orchestrator_thread.start()
# Start orchestration task or thread
try:
loop = asyncio.get_running_loop()
self.orchestrator_task = loop.create_task(self._orchestration_loop())
self.orchestrator_thread = None
except RuntimeError:

def run_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._orchestration_loop())
loop.close()
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai bot Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The thread fallback closes its asyncio loop without draining child tasks, so in-flight resume coroutines can be terminated during shutdown.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At infrastructure/distributed/resume_orchestrator.py, line 156:

<comment>The thread fallback closes its asyncio loop without draining child tasks, so in-flight resume coroutines can be terminated during shutdown.</comment>

<file context>
@@ -142,19 +142,32 @@ def start(self):
+                loop = asyncio.new_event_loop()
+                asyncio.set_event_loop(loop)
+                loop.run_until_complete(self._orchestration_loop())
+                loop.close()
+
+            self.orchestrator_thread = threading.Thread(target=run_loop, daemon=True)
</file context>
Suggested change
loop.close()
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
if pending:
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
loop.close()
Fix with Cubic


self.orchestrator_thread = threading.Thread(target=run_loop, daemon=True)
self.orchestrator_thread.start()
self.orchestrator_task = None

logger.info("Resume orchestrator started")

def stop(self):
"""Stop the resume orchestrator"""

self.orchestrator_active = False
if self.orchestrator_thread:
if hasattr(self, "orchestrator_task") and self.orchestrator_task:
self.orchestrator_task.cancel()
if hasattr(self, "orchestrator_thread") and self.orchestrator_thread:
self.orchestrator_thread.join(timeout=10)

self.resume_engine.stop()
Expand Down Expand Up @@ -317,9 +330,11 @@ async def resume_process_with_dependencies(
"success": success,
"resume_time_seconds": resume_time,
"timestamp": datetime.utcnow().isoformat(),
"interruption_type": interruption_context.interruption_type.value
if interruption_context
else None,
"interruption_type": (
interruption_context.interruption_type.value
if interruption_context
else None
),
}
)

Expand All @@ -334,7 +349,7 @@ async def resume_process_with_dependencies(
self._release_resources(process_id)
self.active_resumes.discard(process_id)

def _orchestration_loop(self):
async def _orchestration_loop(self):
"""Main orchestration loop running in background thread"""

while self.orchestrator_active:
Expand All @@ -349,11 +364,10 @@ def _orchestration_loop(self):
process_id = resume_request["process_id"]

# Schedule resume in async context
asyncio.run_coroutine_threadsafe(
asyncio.create_task(
self.resume_process_with_dependencies(
process_id, resume_request.get("interruption_context")
),
asyncio.get_event_loop(),
)
)

# Check for dependency updates
Expand All @@ -363,11 +377,11 @@ def _orchestration_loop(self):
self._cleanup_old_data()

# Sleep before next iteration
time.sleep(1)
await asyncio.sleep(1)

except Exception as e:
logger.error(f"Orchestration loop error: {e}")
time.sleep(5)
await asyncio.sleep(5)

async def _check_dependencies_satisfied(self, process_id: str) -> bool:
"""Check if all dependencies for a process are satisfied"""
Expand Down Expand Up @@ -485,10 +499,16 @@ def _check_dependency_updates(self):

if has_dependencies:
# Check if we should queue this process for resume
asyncio.run_coroutine_threadsafe(
self._maybe_queue_dependent_process(process_id),
asyncio.get_event_loop(),
)
try:
loop = asyncio.get_running_loop()
loop.create_task(
self._maybe_queue_dependent_process(process_id)
)
except RuntimeError:
asyncio.run_coroutine_threadsafe(
self._maybe_queue_dependent_process(process_id),
asyncio.get_event_loop(),
)

async def _maybe_queue_dependent_process(self, process_id: str):
"""Maybe queue a dependent process if its dependencies are now satisfied"""
Expand Down Expand Up @@ -587,9 +607,9 @@ def get_process_status(self, process_id: str) -> Optional[Dict[str, Any]]:
"registered_at": process_info["registered_at"].isoformat(),
"is_active": process_id in self.active_resumes,
"is_queued": process_id in [req["process_id"] for req in self.resume_queue],
"current_progress": current_state.progress_percentage
if current_state
else None,
"current_progress": (
current_state.progress_percentage if current_state else None
),
"metrics": asdict(metrics) if metrics else None,
"dependencies": [asdict(dep) for dep in dependencies],
"resource_requirements": process_info.get("resource_requirements", {}),
Expand Down