fix(async): handle nested event loops in asyncio.run calls#117
fix(async): handle nested event loops in asyncio.run calls#117zhenliemao wants to merge 3 commits into
Conversation
This fix allows SkillSpector to run in environments that already have a running event loop, such as: - Jupyter Notebooks - LangGraph Studio - FastAPI applications - Any programmatic usage within async code The fix detects if there's already a running loop and uses run_until_complete() instead of throwing a RuntimeError. This prevents silent fallback to unfiltered static findings. Fixes NVIDIA#108 Signed-off-by: GitHub User <494822673@qq.com>
rng1995
left a comment
There was a problem hiding this comment.
REQUEST_CHANGES — the nested-event-loop handling is inverted and will not fix the scenario it targets.
try:
loop = asyncio.get_running_loop()
results = loop.run_until_complete(analyzer.arun_batches(batches))
except RuntimeError:
results = asyncio.run(analyzer.arun_batches(batches))asyncio.get_running_loop() only returns a loop when one is already running, and loop.run_until_complete() cannot be called on a running loop — it raises RuntimeError: This event loop is already running. So in the nested case (Jupyter/FastAPI/LangGraph) control falls into the except, which then calls asyncio.run() — which also raises RuntimeError: asyncio.run() cannot be called from a running event loop, and that one is no longer caught. Net result: it still crashes in exactly the situation this PR is meant to handle. In the normal (no running loop) case it behaves the same as before, so the change provides no benefit.
A working approach is to detect a running loop and offload to a separate thread, e.g.:
import concurrent.futures
try:
asyncio.get_running_loop()
except RuntimeError:
results = asyncio.run(analyzer.arun_batches(batches))
else:
with concurrent.futures.ThreadPoolExecutor(1) as pool:
results = pool.submit(lambda: asyncio.run(analyzer.arun_batches(batches))).result()(Alternatively use nest_asyncio, or make the call sites async.) Please also add a regression test that invokes one of these nodes from within a running event loop and asserts it returns results instead of raising — that would catch the current behavior. The same pattern is duplicated across all three files, so a shared helper would avoid repeating the fix.
- Extract shared run_async function to llm_utils to avoid code duplication - Handle nested event loops safely by running async code in separate thread - Add comprehensive regression tests for nested loop scenario - Replace all duplicated asyncio.run/run_until_complete patterns
| try: | ||
| return asyncio.run(coroutine) | ||
| except RuntimeError as e: | ||
| if "This event loop is already running" in str(e): |
There was a problem hiding this comment.
[Automated SkillSpector Review]
Blocking: calling asyncio.run() from within a running loop raises RuntimeError("asyncio.run() cannot be called from a running event loop"), which does not contain "This event loop is already running". So in the nested case this if is False and the raise below re-fires the error instead of offloading to the thread — the original crash is unfixed. Detect the running loop via asyncio.get_running_loop() instead of substring-matching the message (the test_run_async_with_running_loop assertion of result == 200 cannot pass on CPython 3.8-3.12 with this guard).
rng1995
left a comment
There was a problem hiding this comment.
[Automated SkillSpector Review]
Re-review: still requesting changes — the nested-loop fix is still broken.
Refactoring the duplicated logic into a shared run_async helper is the right structure, but the helper does not actually handle the nested-loop case it targets:
try:
return asyncio.run(coroutine)
except RuntimeError as e:
if "This event loop is already running" in str(e):
... # offload to thread
raiseWhen asyncio.run() is called from inside a running loop (Jupyter / FastAPI / LangGraph — exactly the target scenario), CPython raises RuntimeError("asyncio.run() cannot be called from a running event loop"). That message does not contain "This event loop is already running" (that string is what loop.run_until_complete() raises), so the if is False and the helper re-raises instead of offloading to the thread. I reproduced this on CPython 3.12:
nested (running loop): caught RuntimeError: 'asyncio.run() cannot be called from a running event loop'
FAILED -> RuntimeError 'asyncio.run() cannot be called from a running event loop'
This also means test_run_async_with_running_loop (which asserts result == 200) cannot pass on CPython 3.8-3.12 — please double-check that the suite actually ran green.
Fix: detect the running loop instead of matching a message substring, e.g.
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coroutine) # no running loop
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as ex:
return ex.submit(asyncio.run, coroutine).result()Minor: the from skillspector.constants import MODEL_CONFIG import sits after the T = TypeVar("T") statement (module-level import not at top — E402) and appears unused; please remove/relocate it.
Summary
This fix allows SkillSpector to run in environments that already have a running event loop, preventing RuntimeError when asyncio.run() is called from within an existing loop.
Problem
When running SkillSpector in environments like:
The call to asyncio.run() throws a
RuntimeError: This event loop is already runningand falls back to unfiltered static findings, silently disabling LLM analysis.Solution
Added detection for running event loops:
loop.run_until_complete()to execute the async taskasyncio.run()This maintains backward compatibility while supporting nested loop environments.
Testing
Impact
Fixes #108