diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index d1c5104..6bc1438 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -30,6 +30,10 @@ from __future__ import annotations +import asyncio +import concurrent.futures +from typing import Coroutine, Any + from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import BaseMessage @@ -106,3 +110,29 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: if not isinstance(response, BaseMessage): raise TypeError(f"Expected BaseMessage from chat model, got {type(response).__name__}") return str(response.text) + + +def run_async(coroutine: Coroutine) -> Any: + """ + Run an async coroutine in a synchronous context, even if there's already a running event loop. + + This function safely handles nested event loop scenarios (e.g. Jupyter Notebooks, FastAPI, + LangGraph Studio) by offloading the coroutine execution to a separate thread with its own + event loop when a running loop is detected. + + Args: + coroutine: The async coroutine to run + + Returns: + The result of the coroutine execution + + Raises: + Any exception raised by the coroutine is re-raised as-is + """ + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coroutine) + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + return executor.submit(asyncio.run, coroutine).result() diff --git a/src/skillspector/nodes/analyzers/semantic_developer_intent.py b/src/skillspector/nodes/analyzers/semantic_developer_intent.py index a3a54be..4481365 100644 --- a/src/skillspector/nodes/analyzers/semantic_developer_intent.py +++ b/src/skillspector/nodes/analyzers/semantic_developer_intent.py @@ -26,6 +26,7 @@ from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL, MODEL_CONFIG from skillspector.llm_analyzer_base import LLMAnalyzerBase +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.state import AnalyzerNodeResponse, SkillspectorState @@ -176,7 +177,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: prompt = ANALYZER_PROMPT.format(manifest_section=_format_manifest(manifest)) analyzer = LLMAnalyzerBase(base_prompt=prompt, model=model) batches = analyzer.get_batches(sorted(file_cache), file_cache) - results = asyncio.run(analyzer.arun_batches(batches)) + results = run_async(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/analyzers/semantic_quality_policy.py b/src/skillspector/nodes/analyzers/semantic_quality_policy.py index 3140334..ca85691 100644 --- a/src/skillspector/nodes/analyzers/semantic_quality_policy.py +++ b/src/skillspector/nodes/analyzers/semantic_quality_policy.py @@ -26,6 +26,7 @@ from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL from skillspector.llm_analyzer_base import LLMAnalyzerBase +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.state import AnalyzerNodeResponse, SkillspectorState @@ -145,7 +146,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: try: analyzer = LLMAnalyzerBase(base_prompt=ANALYZER_PROMPT, model=model) batches = analyzer.get_batches(files, file_cache) - results = asyncio.run(analyzer.arun_batches(batches)) + results = run_async(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index d3cef69..3f4c278 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -33,6 +33,7 @@ LLMAnalyzerBase, estimate_tokens, ) +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.models import Finding from skillspector.nodes.analyzers.pattern_defaults import ( @@ -532,7 +533,7 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: model, ) - batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text)) + batch_results = run_async(analyzer.arun_batches(batches, metadata_text=metadata_text)) if len(batch_results) < len(batches): # Some batches never returned. A finding the LLM never saw has no diff --git a/tests/unit/test_llm_utils.py b/tests/unit/test_llm_utils.py index 18a1a7f..11a8cba 100644 --- a/tests/unit/test_llm_utils.py +++ b/tests/unit/test_llm_utils.py @@ -26,6 +26,8 @@ from langchain_anthropic import ChatAnthropic from langchain_core.messages import AIMessage +import asyncio + from skillspector import llm_utils from skillspector.llm_utils import ( _resolve_llm_credentials, @@ -33,6 +35,7 @@ fetch_model_token_limits, get_chat_model, is_llm_available, + run_async, ) from skillspector.providers import NO_LLM_API_KEY_MESSAGE, resolve_provider_credentials from skillspector.providers.nv_build import NvBuildProvider @@ -183,6 +186,48 @@ def test_returns_false_with_message_when_no_credentials(self) -> None: assert msg == NO_LLM_API_KEY_MESSAGE +class TestRunAsync: + """Tests for run_async helper function that handles nested event loops.""" + + async def _test_async_function(self, value: int, delay: float = 0) -> int: + """Simple async function for testing.""" + if delay > 0: + await asyncio.sleep(delay) + return value * 2 + + async def _test_async_function_raises(self) -> None: + """Async function that raises an exception for testing.""" + raise ValueError("Test exception") + + def test_run_async_without_running_loop(self) -> None: + """Test run_async works correctly when there is no running event loop.""" + result = run_async(self._test_async_function(42)) + assert result == 84 + + def test_run_async_with_running_loop(self) -> None: + """Test run_async works correctly even when there is already a running event loop. + + This regression test covers the scenario where SkillSpector is invoked from + environments like Jupyter Notebooks, FastAPI, or LangGraph Studio that already + have an active event loop. + """ + async def _test_in_running_loop() -> int: + # Call run_async from within an already running event loop + return run_async(self._test_async_function(100)) + + # Use asyncio.run to create a running loop context + result = asyncio.run(_test_in_running_loop()) + assert result == 200 + + def test_run_async_propagates_exceptions(self) -> None: + """Test exceptions from async functions are properly propagated.""" + with pytest.raises(ValueError, match="Test exception"): + run_async(self._test_async_function_raises()) + + def test_run_async_with_delay(self) -> None: + """Test run_async correctly handles async functions with await calls.""" + result = run_async(self._test_async_function(5, delay=0.01)) + assert result == 10 class TestGetChatModel: def test_openai_fallback_uses_openai_default_model( self, monkeypatch: pytest.MonkeyPatch