Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/skillspector/llm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@

from __future__ import annotations

import asyncio
import concurrent.futures
from typing import Coroutine, TypeVar

from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import BaseMessage

T = TypeVar("T")

from skillspector.constants import MODEL_CONFIG
from skillspector.model_info import get_max_input_tokens, get_max_output_tokens
from skillspector.providers import (
create_chat_model,
Expand Down Expand Up @@ -106,3 +113,30 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str:
if not isinstance(response, BaseMessage):
raise TypeError(f"Expected BaseMessage from chat model, got {type(response).__name__}")
return str(response.text)


def run_async(coroutine: Coroutine[None, None, T]) -> T:
"""
Run an async coroutine in a synchronous context, even if there's already a running event loop.

This function safely handles nested event loop scenarios (e.g. Jupyter Notebooks, FastAPI,
LangGraph Studio) by offloading the coroutine execution to a separate thread with its own
event loop when a running loop is detected.

Args:
coroutine: The async coroutine to run

Returns:
The result of the coroutine execution

Raises:
Any exception raised by the coroutine is re-raised as-is
"""
try:
return asyncio.run(coroutine)
except RuntimeError as e:
if "This event loop is already running" in str(e):

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Automated SkillSpector Review]

Blocking: calling asyncio.run() from within a running loop raises RuntimeError("asyncio.run() cannot be called from a running event loop"), which does not contain "This event loop is already running". So in the nested case this if is False and the raise below re-fires the error instead of offloading to the thread — the original crash is unfixed. Detect the running loop via asyncio.get_running_loop() instead of substring-matching the message (the test_run_async_with_running_loop assertion of result == 200 cannot pass on CPython 3.8-3.12 with this guard).

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(asyncio.run, coroutine)
return future.result()
raise
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL, MODEL_CONFIG
from skillspector.llm_analyzer_base import LLMAnalyzerBase
from skillspector.llm_utils import run_async
from skillspector.logging_config import get_logger
from skillspector.state import AnalyzerNodeResponse, SkillspectorState

Expand Down Expand Up @@ -176,7 +177,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse:
prompt = ANALYZER_PROMPT.format(manifest_section=_format_manifest(manifest))
analyzer = LLMAnalyzerBase(base_prompt=prompt, model=model)
batches = analyzer.get_batches(sorted(file_cache), file_cache)
results = asyncio.run(analyzer.arun_batches(batches))
results = run_async(analyzer.arun_batches(batches))
findings = analyzer.collect_findings(results)
logger.info("%s: %d findings", ANALYZER_ID, len(findings))
return {"findings": findings}
Expand Down
3 changes: 2 additions & 1 deletion src/skillspector/nodes/analyzers/semantic_quality_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from skillspector.constants import _SKILLSPECTOR_DEFAULT_MODEL
from skillspector.llm_analyzer_base import LLMAnalyzerBase
from skillspector.llm_utils import run_async
from skillspector.logging_config import get_logger
from skillspector.state import AnalyzerNodeResponse, SkillspectorState

Expand Down Expand Up @@ -145,7 +146,7 @@ def node(state: SkillspectorState) -> AnalyzerNodeResponse:
try:
analyzer = LLMAnalyzerBase(base_prompt=ANALYZER_PROMPT, model=model)
batches = analyzer.get_batches(files, file_cache)
results = asyncio.run(analyzer.arun_batches(batches))
results = run_async(analyzer.arun_batches(batches))
findings = analyzer.collect_findings(results)
logger.info("%s: %d findings", ANALYZER_ID, len(findings))
return {"findings": findings}
Expand Down
3 changes: 2 additions & 1 deletion src/skillspector/nodes/meta_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
LLMAnalyzerBase,
estimate_tokens,
)
from skillspector.llm_utils import run_async
from skillspector.logging_config import get_logger
from skillspector.models import Finding
from skillspector.nodes.analyzers.pattern_defaults import (
Expand Down Expand Up @@ -483,7 +484,7 @@ def meta_analyzer(state: SkillspectorState) -> MetaAnalyzerResponse:
model,
)

batch_results = asyncio.run(analyzer.arun_batches(batches, metadata_text=metadata_text))
batch_results = run_async(analyzer.arun_batches(batches, metadata_text=metadata_text))
filtered = analyzer.apply_filter(findings, batch_results)

logger.debug(
Expand Down
45 changes: 45 additions & 0 deletions tests/unit/test_llm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import AIMessage

import asyncio

from skillspector import llm_utils
from skillspector.llm_utils import (
_resolve_llm_credentials,
chat_completion,
fetch_model_token_limits,
get_chat_model,
is_llm_available,
run_async,
)
from skillspector.providers import NO_LLM_API_KEY_MESSAGE, resolve_provider_credentials
from skillspector.providers.nv_build import NvBuildProvider
Expand Down Expand Up @@ -183,6 +186,48 @@ def test_returns_false_with_message_when_no_credentials(self) -> None:
assert msg == NO_LLM_API_KEY_MESSAGE


class TestRunAsync:
"""Tests for run_async helper function that handles nested event loops."""

async def _test_async_function(self, value: int, delay: float = 0) -> int:
"""Simple async function for testing."""
if delay > 0:
await asyncio.sleep(delay)
return value * 2

async def _test_async_function_raises(self) -> None:
"""Async function that raises an exception for testing."""
raise ValueError("Test exception")

def test_run_async_without_running_loop(self) -> None:
"""Test run_async works correctly when there is no running event loop."""
result = run_async(self._test_async_function(42))
assert result == 84

def test_run_async_with_running_loop(self) -> None:
"""Test run_async works correctly even when there is already a running event loop.

This regression test covers the scenario where SkillSpector is invoked from
environments like Jupyter Notebooks, FastAPI, or LangGraph Studio that already
have an active event loop.
"""
async def _test_in_running_loop() -> int:
# Call run_async from within an already running event loop
return run_async(self._test_async_function(100))

# Use asyncio.run to create a running loop context
result = asyncio.run(_test_in_running_loop())
assert result == 200

def test_run_async_propagates_exceptions(self) -> None:
"""Test exceptions from async functions are properly propagated."""
with pytest.raises(ValueError, match="Test exception"):
run_async(self._test_async_function_raises())

def test_run_async_with_delay(self) -> None:
"""Test run_async correctly handles async functions with await calls."""
result = run_async(self._test_async_function(5, delay=0.01))
assert result == 10
class TestGetChatModel:
def test_openai_fallback_uses_openai_default_model(
self, monkeypatch: pytest.MonkeyPatch
Expand Down