From f4ed3cb1b8b4e1eef9cdb020cda7f47abb59625f Mon Sep 17 00:00:00 2001 From: GitHub User <494822673@qq.com> Date: Sat, 20 Jun 2026 17:16:09 +0800 Subject: [PATCH 1/4] fix(async): handle nested event loops in asyncio.run calls This fix allows SkillSpector to run in environments that already have a running event loop, such as: - Jupyter Notebooks - LangGraph Studio - FastAPI applications - Any programmatic usage within async code The fix detects if there's already a running loop and uses run_until_complete() instead of throwing a RuntimeError. This prevents silent fallback to unfiltered static findings. Fixes #108 Signed-off-by: GitHub User <494822673@qq.com> --- .../nodes/analyzers/semantic_developer_intent.py | 6 +++++- src/skillspector/nodes/analyzers/semantic_quality_policy.py | 6 +++++- src/skillspector/nodes/meta_analyzer.py | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/skillspector/nodes/analyzers/semantic_developer_intent.py b/src/skillspector/nodes/analyzers/semantic_developer_intent.py index a3a54be..a23a71b 100644 --- a/src/skillspector/nodes/analyzers/semantic_developer_intent.py +++ b/src/skillspector/nodes/analyzers/semantic_developer_intent.py @@ -176,7 +176,11 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: prompt = ANALYZER_PROMPT.format(manifest_section=_format_manifest(manifest)) analyzer = LLMAnalyzerBase(base_prompt=prompt, model=model) batches = analyzer.get_batches(sorted(file_cache), file_cache) - results = asyncio.run(analyzer.arun_batches(batches)) + try: + loop = asyncio.get_running_loop() + results = loop.run_until_complete(analyzer.arun_batches(batches)) + except RuntimeError: + results = asyncio.run(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/analyzers/semantic_quality_policy.py b/src/skillspector/nodes/analyzers/semantic_quality_policy.py index 3140334..92fede8 100644 --- a/src/skillspector/nodes/analyzers/semantic_quality_policy.py +++ b/src/skillspector/nodes/analyzers/semantic_quality_policy.py @@ -145,7 +145,11 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: try: analyzer = LLMAnalyzerBase(base_prompt=ANALYZER_PROMPT, model=model) batches = analyzer.get_batches(files, file_cache) - results = asyncio.run(analyzer.arun_batches(batches)) + try: + loop = asyncio.get_running_loop() + results = loop.run_until_complete(analyzer.arun_batches(batches)) + except RuntimeError: + results = asyncio.run(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index 8f2b541..ebb949c 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -391,7 +391,11 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: model, ) - batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text)) + try: + loop = asyncio.get_running_loop() + batch_results = loop.run_until_complete(analyzer.arun_batches(batches, metadata_text=metadata_text)) + except RuntimeError: + batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text)) filtered = analyzer.apply_filter(findings, batch_results) logger.debug( From b5ed19306ca50417af625a4af715a9cf0373daa2 Mon Sep 17 00:00:00 2001 From: zhenliemao <494822673@qq.com> Date: Tue, 23 Jun 2026 20:16:21 +0800 Subject: [PATCH 2/4] Refactor async loop handling with shared run_async helper - Extract shared run_async function to llm_utils to avoid code duplication - Handle nested event loops safely by running async code in separate thread - Add comprehensive regression tests for nested loop scenario - Replace all duplicated asyncio.run/run_until_complete patterns --- src/skillspector/llm_utils.py | 33 +++++++++++++ .../analyzers/semantic_developer_intent.py | 7 +-- .../analyzers/semantic_quality_policy.py | 7 +-- src/skillspector/nodes/meta_analyzer.py | 7 +-- tests/unit/test_llm_utils.py | 47 +++++++++++++++++++ 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index ab6e551..b53734f 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -30,9 +30,15 @@ from __future__ import annotations +import asyncio +import concurrent.futures +from typing import Coroutine, TypeVar + from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import BaseMessage +T = TypeVar("T") + from skillspector.constants import MODEL_CONFIG from skillspector.model_info import get_max_input_tokens, get_max_output_tokens from skillspector.providers import ( @@ -92,3 +98,30 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: if not isinstance(response, BaseMessage): raise TypeError(f"Expected BaseMessage from chat model, got {type(response).__name__}") return str(response.text) + + +def run_async(coroutine: Coroutine[None, None, T]) -> T: + """ + Run an async coroutine in a synchronous context, even if there's already a running event loop. + + This function safely handles nested event loop scenarios (e.g. Jupyter Notebooks, FastAPI, + LangGraph Studio) by offloading the coroutine execution to a separate thread with its own + event loop when a running loop is detected. + + Args: + coroutine: The async coroutine to run + + Returns: + The result of the coroutine execution + + Raises: + Any exception raised by the coroutine is re-raised as-is + """ + try: + return asyncio.run(coroutine) + except RuntimeError as e: + if "This event loop is already running" in str(e): + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(asyncio.run, coroutine) + return future.result() + raise diff --git a/src/skillspector/nodes/analyzers/semantic_developer_intent.py b/src/skillspector/nodes/analyzers/semantic_developer_intent.py index a23a71b..4481365 100644 --- a/src/skillspector/nodes/analyzers/semantic_developer_intent.py +++ b/src/skillspector/nodes/analyzers/semantic_developer_intent.py @@ -26,6 +26,7 @@ from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL, MODEL_CONFIG from skillspector.llm_analyzer_base import LLMAnalyzerBase +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.state import AnalyzerNodeResponse, SkillspectorState @@ -176,11 +177,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: prompt = ANALYZER_PROMPT.format(manifest_section=_format_manifest(manifest)) analyzer = LLMAnalyzerBase(base_prompt=prompt, model=model) batches = analyzer.get_batches(sorted(file_cache), file_cache) - try: - loop = asyncio.get_running_loop() - results = loop.run_until_complete(analyzer.arun_batches(batches)) - except RuntimeError: - results = asyncio.run(analyzer.arun_batches(batches)) + results = run_async(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/analyzers/semantic_quality_policy.py b/src/skillspector/nodes/analyzers/semantic_quality_policy.py index 92fede8..ca85691 100644 --- a/src/skillspector/nodes/analyzers/semantic_quality_policy.py +++ b/src/skillspector/nodes/analyzers/semantic_quality_policy.py @@ -26,6 +26,7 @@ from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL from skillspector.llm_analyzer_base import LLMAnalyzerBase +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.state import AnalyzerNodeResponse, SkillspectorState @@ -145,11 +146,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse: try: analyzer = LLMAnalyzerBase(base_prompt=ANALYZER_PROMPT, model=model) batches = analyzer.get_batches(files, file_cache) - try: - loop = asyncio.get_running_loop() - results = loop.run_until_complete(analyzer.arun_batches(batches)) - except RuntimeError: - results = asyncio.run(analyzer.arun_batches(batches)) + results = run_async(analyzer.arun_batches(batches)) findings = analyzer.collect_findings(results) logger.info("%s: %d findings", ANALYZER_ID, len(findings)) return {"findings": findings} diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index ebb949c..bd9c7fa 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -33,6 +33,7 @@ LLMAnalyzerBase, estimate_tokens, ) +from skillspector.llm_utils import run_async from skillspector.logging_config import get_logger from skillspector.models import Finding from skillspector.nodes.analyzers.pattern_defaults import ( @@ -391,11 +392,7 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: model, ) - try: - loop = asyncio.get_running_loop() - batch_results = loop.run_until_complete(analyzer.arun_batches(batches, metadata_text=metadata_text)) - except RuntimeError: - batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text)) + batch_results = run_async(analyzer.arun_batches(batches, metadata_text=metadata_text)) filtered = analyzer.apply_filter(findings, batch_results) logger.debug( diff --git a/tests/unit/test_llm_utils.py b/tests/unit/test_llm_utils.py index 5e89ead..104d162 100644 --- a/tests/unit/test_llm_utils.py +++ b/tests/unit/test_llm_utils.py @@ -26,6 +26,8 @@ from langchain_anthropic import ChatAnthropic from langchain_core.messages import AIMessage +import asyncio + from skillspector import llm_utils from skillspector.llm_utils import ( _resolve_llm_credentials, @@ -33,6 +35,7 @@ fetch_model_token_limits, get_chat_model, is_llm_available, + run_async, ) from skillspector.providers import NO_LLM_API_KEY_MESSAGE, resolve_provider_credentials @@ -178,3 +181,47 @@ def test_returns_false_with_message_when_no_credentials(self) -> None: ok, msg = is_llm_available() assert ok is False assert msg == NO_LLM_API_KEY_MESSAGE + + +class TestRunAsync: + """Tests for run_async helper function that handles nested event loops.""" + + async def _test_async_function(self, value: int, delay: float = 0) -> int: + """Simple async function for testing.""" + if delay > 0: + await asyncio.sleep(delay) + return value * 2 + + async def _test_async_function_raises(self) -> None: + """Async function that raises an exception for testing.""" + raise ValueError("Test exception") + + def test_run_async_without_running_loop(self) -> None: + """Test run_async works correctly when there is no running event loop.""" + result = run_async(self._test_async_function(42)) + assert result == 84 + + def test_run_async_with_running_loop(self) -> None: + """Test run_async works correctly even when there is already a running event loop. + + This regression test covers the scenario where SkillSpector is invoked from + environments like Jupyter Notebooks, FastAPI, or LangGraph Studio that already + have an active event loop. + """ + async def _test_in_running_loop() -> int: + # Call run_async from within an already running event loop + return run_async(self._test_async_function(100)) + + # Use asyncio.run to create a running loop context + result = asyncio.run(_test_in_running_loop()) + assert result == 200 + + def test_run_async_propagates_exceptions(self) -> None: + """Test exceptions from async functions are properly propagated.""" + with pytest.raises(ValueError, match="Test exception"): + run_async(self._test_async_function_raises()) + + def test_run_async_with_delay(self) -> None: + """Test run_async correctly handles async functions with await calls.""" + result = run_async(self._test_async_function(5, delay=0.01)) + assert result == 10 From 2179f4cdda9ad001bd4fe82d39302ac322392296 Mon Sep 17 00:00:00 2001 From: zhenliemao <494822673@qq.com> Date: Thu, 25 Jun 2026 10:15:17 +0800 Subject: [PATCH 3/4] Fix run_async nested loop detection per review --- src/skillspector/llm_utils.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index d3fd8a6..6bc1438 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -32,14 +32,11 @@ import asyncio import concurrent.futures -from typing import Coroutine, TypeVar +from typing import Coroutine, Any from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.messages import BaseMessage -T = TypeVar("T") - -from skillspector.constants import MODEL_CONFIG from skillspector.model_info import get_max_input_tokens, get_max_output_tokens from skillspector.providers import ( create_chat_model, @@ -115,7 +112,7 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: return str(response.text) -def run_async(coroutine: Coroutine[None, None, T]) -> T: +def run_async(coroutine: Coroutine) -> Any: """ Run an async coroutine in a synchronous context, even if there's already a running event loop. @@ -133,10 +130,9 @@ def run_async(coroutine: Coroutine[None, None, T]) -> T: Any exception raised by the coroutine is re-raised as-is """ try: + asyncio.get_running_loop() + except RuntimeError: return asyncio.run(coroutine) - except RuntimeError as e: - if "This event loop is already running" in str(e): - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(asyncio.run, coroutine) - return future.result() - raise + + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + return executor.submit(asyncio.run, coroutine).result() From 9736e84462d49b047810b381293a30f8fad8548e Mon Sep 17 00:00:00 2001 From: zhenliemao <494822673@qq.com> Date: Thu, 25 Jun 2026 10:21:47 +0800 Subject: [PATCH 4/4] Resolve merge conflict with main --- src/skillspector/nodes/meta_analyzer.py | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/skillspector/nodes/meta_analyzer.py b/src/skillspector/nodes/meta_analyzer.py index 6e13be0..15eb61b 100644 --- a/src/skillspector/nodes/meta_analyzer.py +++ b/src/skillspector/nodes/meta_analyzer.py @@ -485,7 +485,28 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse: ) batch_results = run_async(analyzer.arun_batches(batches, metadata_text=metadata_text)) - filtered = analyzer.apply_filter(findings, batch_results) + + if len(batch_results) < len(batches): + # Some batches never returned. A finding the LLM never saw has no + # verdict — keep it via the fallback path instead of letting + # apply_filter treat the missing confirmation as a rejection. + analysed_ids = {id(f) for batch, _ in batch_results for f in batch.findings} + analysed = [f for f in findings if id(f) in analysed_ids] + unanalysed = [f for f in findings if id(f) not in analysed_ids] + else: + analysed, unanalysed = findings, [] + + filtered = analyzer.apply_filter(analysed, batch_results) + if unanalysed: + logger.warning( + "Meta-analyzer: %d/%d batches failed; keeping %d findings in %d " + "files unfiltered (no LLM verdict)", + len(batches) - len(batch_results), + len(batches), + len(unanalysed), + len({f.file for f in unanalysed}), + ) + filtered.extend(_fallback_filtered(unanalysed)) logger.debug( "LLM filtering done: %d findings -> %d after filter",