diff --git a/.Jules/bolt.md b/.Jules/bolt.md index b507f81f..e222d377 100644 --- a/.Jules/bolt.md +++ b/.Jules/bolt.md @@ -1,2 +1,3 @@ ## 2024-03-28 - Initial Bolt Run | Learning: The codebase is primarily Python, not TS/JS/Astro | Action: Adjust search queries to look for Python performance hotspots, such as nested loops, N+1 queries, unnecessary list comprehensions, or missing caching/memoization. ## 2024-03-28 - Optimize list comprehensions | Learning: Nested or repeated list comprehensions on large arrays cause unnecessary overhead in Python | Action: Consolidate repeated iterations into a single O(N) loop. +## 2025-02-28 - Async Orchestration Loop Refactoring | Learning: Replacing blocking synchronous OS thread sleeps with native asyncio sleeping inside the orchestrator loop improves performance by eliminating an unnecessary thread block and allowing concurrent scheduling of other tasks on the main event loop. | Action: Ensure orchestration logic running in the background uses `async def` and `await asyncio.sleep` over `threading.Thread` with `time.sleep` when interacting directly with async dependencies and queue management. diff --git a/infrastructure/distributed/resume_orchestrator.py b/infrastructure/distributed/resume_orchestrator.py index 9c0bba73..7d441d12 100644 --- a/infrastructure/distributed/resume_orchestrator.py +++ b/infrastructure/distributed/resume_orchestrator.py @@ -142,11 +142,22 @@ def start(self): self.resume_engine.start() self.orchestrator_active = True - # Start orchestration thread - self.orchestrator_thread = threading.Thread( - target=self._orchestration_loop, daemon=True - ) - self.orchestrator_thread.start() + # Start orchestration task or thread + try: + loop = asyncio.get_running_loop() + self.orchestrator_task = loop.create_task(self._orchestration_loop()) + self.orchestrator_thread = None + except RuntimeError: + + def run_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self._orchestration_loop()) + loop.close() + + self.orchestrator_thread = threading.Thread(target=run_loop, daemon=True) + self.orchestrator_thread.start() + self.orchestrator_task = None logger.info("Resume orchestrator started") @@ -154,7 +165,9 @@ def stop(self): """Stop the resume orchestrator""" self.orchestrator_active = False - if self.orchestrator_thread: + if hasattr(self, "orchestrator_task") and self.orchestrator_task: + self.orchestrator_task.cancel() + if hasattr(self, "orchestrator_thread") and self.orchestrator_thread: self.orchestrator_thread.join(timeout=10) self.resume_engine.stop() @@ -317,9 +330,11 @@ async def resume_process_with_dependencies( "success": success, "resume_time_seconds": resume_time, "timestamp": datetime.utcnow().isoformat(), - "interruption_type": interruption_context.interruption_type.value - if interruption_context - else None, + "interruption_type": ( + interruption_context.interruption_type.value + if interruption_context + else None + ), } ) @@ -334,7 +349,7 @@ async def resume_process_with_dependencies( self._release_resources(process_id) self.active_resumes.discard(process_id) - def _orchestration_loop(self): + async def _orchestration_loop(self): """Main orchestration loop running in background thread""" while self.orchestrator_active: @@ -349,11 +364,10 @@ def _orchestration_loop(self): process_id = resume_request["process_id"] # Schedule resume in async context - asyncio.run_coroutine_threadsafe( + asyncio.create_task( self.resume_process_with_dependencies( process_id, resume_request.get("interruption_context") - ), - asyncio.get_event_loop(), + ) ) # Check for dependency updates @@ -363,11 +377,11 @@ def _orchestration_loop(self): self._cleanup_old_data() # Sleep before next iteration - time.sleep(1) + await asyncio.sleep(1) except Exception as e: logger.error(f"Orchestration loop error: {e}") - time.sleep(5) + await asyncio.sleep(5) async def _check_dependencies_satisfied(self, process_id: str) -> bool: """Check if all dependencies for a process are satisfied""" @@ -485,10 +499,16 @@ def _check_dependency_updates(self): if has_dependencies: # Check if we should queue this process for resume - asyncio.run_coroutine_threadsafe( - self._maybe_queue_dependent_process(process_id), - asyncio.get_event_loop(), - ) + try: + loop = asyncio.get_running_loop() + loop.create_task( + self._maybe_queue_dependent_process(process_id) + ) + except RuntimeError: + asyncio.run_coroutine_threadsafe( + self._maybe_queue_dependent_process(process_id), + asyncio.get_event_loop(), + ) async def _maybe_queue_dependent_process(self, process_id: str): """Maybe queue a dependent process if its dependencies are now satisfied""" @@ -587,9 +607,9 @@ def get_process_status(self, process_id: str) -> Optional[Dict[str, Any]]: "registered_at": process_info["registered_at"].isoformat(), "is_active": process_id in self.active_resumes, "is_queued": process_id in [req["process_id"] for req in self.resume_queue], - "current_progress": current_state.progress_percentage - if current_state - else None, + "current_progress": ( + current_state.progress_percentage if current_state else None + ), "metrics": asdict(metrics) if metrics else None, "dependencies": [asdict(dep) for dep in dependencies], "resource_requirements": process_info.get("resource_requirements", {}),