From 4336c9b85dba871e89aa8cfeb1c4c820c951795c Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Thu, 11 Jun 2026 22:41:20 -0400 Subject: [PATCH 1/8] Created retry tests for sync and async calls --- tests/nodes/test_llm_analyzer_base.py | 85 +++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/tests/nodes/test_llm_analyzer_base.py b/tests/nodes/test_llm_analyzer_base.py index 9899a7f..6bb68c2 100644 --- a/tests/nodes/test_llm_analyzer_base.py +++ b/tests/nodes/test_llm_analyzer_base.py @@ -1225,3 +1225,88 @@ def test_unknown_model_uses_default(self) -> None: out = get_max_output_tokens("unknown/model") assert inp == int(mocked_ctx * 0.75) assert out == int(mocked_ctx * 0.25) + + +# --------------------------------------------------------------------------- +# Rate limit retry tests +# --------------------------------------------------------------------------- + + +class TestRateLimitRetry: + MODEL = "nvidia/openai/gpt-oss-120b" + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + def test_run_batches_retries_on_429(self) -> None: + """Verify sync batch processing retries when 429 is raised.""" + calls = 0 + original_response = LLMAnalysisResult( + findings=[LLMFinding(rule_id="T-1", message="hit", severity="LOW", start_line=1)] + ) + + def flaky_invoke(prompt: str) -> LLMAnalysisResult: + nonlocal calls + calls += 1 + if calls == 1: + raise Exception("429 Too Many Requests") + return original_response + + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.invoke = flaky_invoke + batch = Batch(file_path="a.py", content="code") + + results = analyzer.run_batches([batch]) + assert len(results) == 1 + assert results[0][1][0].rule_id == "T-1" + assert calls == 2 + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + async def test_arun_batches_retries_on_429(self) -> None: + """Verify async batch processing retries when 429 is raised.""" + calls = 0 + original_response = LLMAnalysisResult( + findings=[LLMFinding(rule_id="T-1", message="hit", severity="LOW", start_line=1)] + ) + + async def flaky_ainvoke(prompt: str) -> LLMAnalysisResult: + nonlocal calls + calls += 1 + if calls == 1: + raise Exception("429 Too Many Requests") + return original_response + + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.ainvoke = flaky_ainvoke + batch = Batch(file_path="a.py", content="code") + + results = await analyzer.arun_batches([batch]) + assert len(results) == 1 + assert results[0][1][0].rule_id == "T-1" + assert calls == 2 + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + def test_run_batches_fails_after_max_retries(self) -> None: + """Verify sync processing fails after exhausting retries.""" + + def always_fails(prompt: str) -> LLMAnalysisResult: + raise Exception("429 Too Many Requests") + + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.invoke = always_fails + batch = Batch(file_path="a.py", content="code") + + with pytest.raises(Exception, match="429"): + analyzer.run_batches([batch]) + + @patch(MOCK_PATCH_TARGET, _mock_get_chat_model) + async def test_arun_batches_fails_after_max_retries(self) -> None: + """Verify async processing fails after exhausting retries.""" + + async def always_fails(prompt: str) -> LLMAnalysisResult: + raise Exception("429 Too Many Requests") + + analyzer = LLMAnalyzerBase(base_prompt="test", model=self.MODEL) + analyzer._structured_llm.ainvoke = always_fails + batch = Batch(file_path="a.py", content="code") + + with pytest.raises(Exception, match="429"): + await analyzer.arun_batches([batch]) From 11492ad805bfab7f461aafc45d64f1b2761833e9 Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Thu, 11 Jun 2026 22:43:22 -0400 Subject: [PATCH 2/8] Updated documentation to include SKILLSPECTOR_MAX_CONCURRENCY environment variable --- docs/DEVELOPMENT.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/DEVELOPMENT.md b/docs/DEVELOPMENT.md index 0795f09..433f6a0 100644 --- a/docs/DEVELOPMENT.md +++ b/docs/DEVELOPMENT.md @@ -266,6 +266,7 @@ Copy [.env.example](../.env.example) to `.env` in the project root and set value | `OPENAI_BASE_URL` | Override the OpenAI endpoint (e.g. point at Ollama). | `http://localhost:11434/v1` | | `ANTHROPIC_API_KEY` | Credential for `SKILLSPECTOR_PROVIDER=anthropic`. | `sk-ant-...` | | `SKILLSPECTOR_MODEL` | Override the active provider's bundled default model (see [README.md](../README.md) for per-provider defaults). | `gpt-5.2` | +| `SKILLSPECTOR_MAX_CONCURRENCY` | Maximum concurrent LLM requests. Defaults to 5 if unset. Lower values reduce rate limits but increase scan time. | `7` | ### Constants, token budgets, and LLM From 97bbfb1857f9796ff9469e94a45eba85006a6a9e Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Thu, 11 Jun 2026 22:45:28 -0400 Subject: [PATCH 3/8] fix(llm): retry transient failures and make concurrency configurable --- .env.example | 2 ++ src/skillspector/llm_analyzer_base.py | 20 +++++++---- src/skillspector/llm_utils.py | 52 +++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 6 deletions(-) diff --git a/.env.example b/.env.example index 24ad4f5..07058f2 100644 --- a/.env.example +++ b/.env.example @@ -26,6 +26,8 @@ SKILLSPECTOR_MODEL= # leave empty to # SKILLSPECTOR_MODEL_REGISTRY=./model_registry.yaml # optional override; defaults to each provider's bundled YAML in src/skillspector/providers/ SKILLSPECTOR_LOG_LEVEL=WARNING # options: DEBUG|INFO|WARNING|ERROR +SKILLSPECTOR_MAX_CONCURRENCY=5 # concurrent LLM requests; increase for faster processing (higher = more rate limits) + # langchain/langsmith config (all optional) LANGCHAIN_TRACING_V2=false LANGCHAIN_API_KEY= diff --git a/src/skillspector/llm_analyzer_base.py b/src/skillspector/llm_analyzer_base.py index a678e4e..5ac832b 100644 --- a/src/skillspector/llm_analyzer_base.py +++ b/src/skillspector/llm_analyzer_base.py @@ -28,13 +28,14 @@ from __future__ import annotations import asyncio +import os from collections import defaultdict from dataclasses import dataclass, field from typing import Literal from pydantic import BaseModel, Field -from skillspector.llm_utils import get_chat_model +from skillspector.llm_utils import get_chat_model, retry_llm_call, retry_llm_call_sync from skillspector.logging_config import get_logger from skillspector.model_info import get_max_input_tokens from skillspector.models import Finding @@ -353,9 +354,13 @@ def run_batches( len(batch.findings), ) if self._structured_llm: - response = self._structured_llm.invoke(prompt) + response = retry_llm_call_sync( + lambda prompt=prompt: self._structured_llm.invoke(prompt) + ) else: - response = self._llm.invoke(prompt).content + response = retry_llm_call_sync( + lambda prompt=prompt: self._llm.invoke(prompt) + ).content logger.debug("LLM response for %s", batch.file_label) parsed = self.parse_response(response, batch) results.append((batch, parsed)) @@ -365,7 +370,7 @@ async def arun_batches( self, batches: list[Batch], *, - max_concurrency: int = 10, + max_concurrency: int | None = None, **kwargs: object, ) -> list[tuple[Batch, list]]: """Execute LLM calls for all *batches* concurrently. @@ -376,6 +381,9 @@ async def arun_batches( The return type mirrors :meth:`run_batches`. """ + if max_concurrency is None: + max_concurrency = int(os.environ.get("SKILLSPECTOR_MAX_CONCURRENCY", "5")) + sem = asyncio.Semaphore(max_concurrency) async def _process(batch: Batch) -> tuple[Batch, list]: @@ -388,9 +396,9 @@ async def _process(batch: Batch) -> tuple[Batch, list]: len(batch.findings), ) if self._structured_llm: - response = await self._structured_llm.ainvoke(prompt) + response = await retry_llm_call(lambda: self._structured_llm.ainvoke(prompt)) else: - response = (await self._llm.ainvoke(prompt)).content + response = (await retry_llm_call(lambda: self._llm.ainvoke(prompt))).content logger.debug("LLM response for %s", batch.file_label) return (batch, self.parse_response(response, batch)) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index 1e03fc1..9bb6e89 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -29,7 +29,9 @@ from __future__ import annotations +import asyncio import os +import time from langchain_openai import ChatOpenAI @@ -101,3 +103,53 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: llm = get_chat_model(model=model) response = llm.invoke(prompt) return response.content or "" + + +def retry_llm_call_sync(call_func, max_attempts=3): + """Retry transient LLM errors (429, timeout) with exponential backoff (sync).""" + for attempt in range(max_attempts): + try: + return call_func() # Call it each retry + except Exception as e: + error_str = str(e).lower() + error_name = type(e).__name__.lower() + + is_retryable = ( + "429" in error_str + or "ratelimit" in error_name + or "timeout" in error_name + or "timeout" in error_str + ) + + if not is_retryable: + raise + if attempt == max_attempts - 1: + raise + + wait = 2**attempt + time.sleep(wait) + + +async def retry_llm_call(coro_func, max_attempts=3): + """Retry transient LLM errors (429, timeout) with exponential backoff (async).""" + for attempt in range(max_attempts): + try: + return await coro_func() + except Exception as e: + error_str = str(e).lower() + error_name = type(e).__name__.lower() + + is_retryable = ( + "429" in error_str + or "ratelimit" in error_name + or "timeout" in error_name + or "timeout" in error_str + ) + + if not is_retryable: + raise + if attempt == max_attempts - 1: + raise + + wait = 2**attempt + await asyncio.sleep(wait) From 619d522a5e5bb0927dfaf8e94a5a7c44650a907a Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Thu, 11 Jun 2026 23:00:08 -0400 Subject: [PATCH 4/8] Increased default max retries to 4 --- src/skillspector/llm_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index 9bb6e89..570fcce 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -105,7 +105,7 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: return response.content or "" -def retry_llm_call_sync(call_func, max_attempts=3): +def retry_llm_call_sync(call_func, max_attempts=4): """Retry transient LLM errors (429, timeout) with exponential backoff (sync).""" for attempt in range(max_attempts): try: @@ -130,7 +130,7 @@ def retry_llm_call_sync(call_func, max_attempts=3): time.sleep(wait) -async def retry_llm_call(coro_func, max_attempts=3): +async def retry_llm_call(coro_func, max_attempts=4): """Retry transient LLM errors (429, timeout) with exponential backoff (async).""" for attempt in range(max_attempts): try: From f7b4951232d5837063116885b87d2b935e2f627c Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Tue, 23 Jun 2026 12:53:45 -0400 Subject: [PATCH 5/8] Expanded list of retry cases --- src/skillspector/llm_utils.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index 570fcce..c693903 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -31,6 +31,7 @@ import asyncio import os +from random import random import time from langchain_openai import ChatOpenAI @@ -106,7 +107,7 @@ def chat_completion(prompt: str, *, model: str | None = None) -> str: def retry_llm_call_sync(call_func, max_attempts=4): - """Retry transient LLM errors (429, timeout) with exponential backoff (sync).""" + """Retry transient LLM errors (429, timeout, 5XX, etc) with exponential backoff (sync).""" for attempt in range(max_attempts): try: return call_func() # Call it each retry @@ -116,9 +117,17 @@ def retry_llm_call_sync(call_func, max_attempts=4): is_retryable = ( "429" in error_str + or "500" in error_str + or "502" in error_str + or "503" in error_str + or "529" in error_str or "ratelimit" in error_name or "timeout" in error_name or "timeout" in error_str + or "overloaded" in error_str + or "service unavailable" in error_str + or "bad gateway" in error_str + or "connection" in error_name ) if not is_retryable: @@ -126,12 +135,12 @@ def retry_llm_call_sync(call_func, max_attempts=4): if attempt == max_attempts - 1: raise - wait = 2**attempt + wait = 2 ** attempt + random.uniform(0, 1) time.sleep(wait) async def retry_llm_call(coro_func, max_attempts=4): - """Retry transient LLM errors (429, timeout) with exponential backoff (async).""" + """Retry transient LLM errors (429, timeout, 5XX, etc) with exponential backoff (async).""" for attempt in range(max_attempts): try: return await coro_func() @@ -141,9 +150,17 @@ async def retry_llm_call(coro_func, max_attempts=4): is_retryable = ( "429" in error_str + or "500" in error_str + or "502" in error_str + or "503" in error_str + or "529" in error_str or "ratelimit" in error_name or "timeout" in error_name or "timeout" in error_str + or "overloaded" in error_str + or "service unavailable" in error_str + or "bad gateway" in error_str + or "connection" in error_name ) if not is_retryable: @@ -151,5 +168,5 @@ async def retry_llm_call(coro_func, max_attempts=4): if attempt == max_attempts - 1: raise - wait = 2**attempt + wait = 2 ** attempt + random.uniform(0, 1) await asyncio.sleep(wait) From 8a83a378df106037dad62e663e87e0e4fed0723b Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Tue, 23 Jun 2026 12:56:12 -0400 Subject: [PATCH 6/8] Implemented suggested hardening for max_concurrency --- src/skillspector/llm_analyzer_base.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/skillspector/llm_analyzer_base.py b/src/skillspector/llm_analyzer_base.py index 5ac832b..8f004b8 100644 --- a/src/skillspector/llm_analyzer_base.py +++ b/src/skillspector/llm_analyzer_base.py @@ -382,7 +382,14 @@ async def arun_batches( The return type mirrors :meth:`run_batches`. """ if max_concurrency is None: - max_concurrency = int(os.environ.get("SKILLSPECTOR_MAX_CONCURRENCY", "5")) + raw = (os.environ.get("SKILLSPECTOR_MAX_CONCURRENCY") or "").strip() + try: + max_concurrency = int(raw) if raw else 5 + except ValueError: + logger.warning("Invalid SKILLSPECTOR_MAX_CONCURRENCY=%r; defaulting to 5", raw) + max_concurrency = 5 + if max_concurrency < 1: + max_concurrency = 1 sem = asyncio.Semaphore(max_concurrency) From bb824c771e4bf9583af55454c30da0222c5ef87a Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Tue, 23 Jun 2026 13:34:36 -0400 Subject: [PATCH 7/8] Moved max_concurrency logic to a helper function --- src/skillspector/llm_analyzer_base.py | 18 +++++++---------- src/skillspector/llm_utils.py | 29 ++++++++++++++++++++++++--- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/src/skillspector/llm_analyzer_base.py b/src/skillspector/llm_analyzer_base.py index 8f004b8..79a6db4 100644 --- a/src/skillspector/llm_analyzer_base.py +++ b/src/skillspector/llm_analyzer_base.py @@ -28,14 +28,18 @@ from __future__ import annotations import asyncio -import os from collections import defaultdict from dataclasses import dataclass, field from typing import Literal from pydantic import BaseModel, Field -from skillspector.llm_utils import get_chat_model, retry_llm_call, retry_llm_call_sync +from skillspector.llm_utils import ( + _resolve_max_concurrency, + get_chat_model, + retry_llm_call, + retry_llm_call_sync, +) from skillspector.logging_config import get_logger from skillspector.model_info import get_max_input_tokens from skillspector.models import Finding @@ -381,15 +385,7 @@ async def arun_batches( The return type mirrors :meth:`run_batches`. """ - if max_concurrency is None: - raw = (os.environ.get("SKILLSPECTOR_MAX_CONCURRENCY") or "").strip() - try: - max_concurrency = int(raw) if raw else 5 - except ValueError: - logger.warning("Invalid SKILLSPECTOR_MAX_CONCURRENCY=%r; defaulting to 5", raw) - max_concurrency = 5 - if max_concurrency < 1: - max_concurrency = 1 + max_concurrency = _resolve_max_concurrency(max_concurrency) sem = asyncio.Semaphore(max_concurrency) diff --git a/src/skillspector/llm_utils.py b/src/skillspector/llm_utils.py index c693903..51625a9 100644 --- a/src/skillspector/llm_utils.py +++ b/src/skillspector/llm_utils.py @@ -30,8 +30,9 @@ from __future__ import annotations import asyncio +import logging import os -from random import random +import random import time from langchain_openai import ChatOpenAI @@ -40,6 +41,8 @@ from skillspector.model_info import get_max_input_tokens, get_max_output_tokens from skillspector.providers import resolve_provider_credentials +logger = logging.getLogger(__name__) + def _resolve_llm_credentials() -> tuple[str, str | None]: """Return ``(api_key, base_url)`` resolved from the environment. @@ -135,7 +138,7 @@ def retry_llm_call_sync(call_func, max_attempts=4): if attempt == max_attempts - 1: raise - wait = 2 ** attempt + random.uniform(0, 1) + wait = 2**attempt + random.uniform(0, 1) time.sleep(wait) @@ -168,5 +171,25 @@ async def retry_llm_call(coro_func, max_attempts=4): if attempt == max_attempts - 1: raise - wait = 2 ** attempt + random.uniform(0, 1) + wait = 2**attempt + random.uniform(0, 1) await asyncio.sleep(wait) + + +def _resolve_max_concurrency(max_concurrency: int | None = None) -> int: + """Resolve max concurrency from an explicit value or environment.""" + + if max_concurrency is not None: + return max(max_concurrency, 1) + + raw = (os.environ.get("SKILLSPECTOR_MAX_CONCURRENCY") or "").strip() + + try: + max_concurrency = int(raw) if raw else 5 + except ValueError: + logger.warning( + "Invalid SKILLSPECTOR_MAX_CONCURRENCY=%r; defaulting to 5", + raw, + ) + max_concurrency = 5 + + return max(max_concurrency, 1) From 4500f3ceda736e8bfdbe247da2f98deb8a0fd708 Mon Sep 17 00:00:00 2001 From: Julian Hamze Date: Tue, 23 Jun 2026 13:40:00 -0400 Subject: [PATCH 8/8] Added coverage for max concurrency edge cases --- tests/nodes/test_llm_analyzer_base.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/nodes/test_llm_analyzer_base.py b/tests/nodes/test_llm_analyzer_base.py index 6bb68c2..2368000 100644 --- a/tests/nodes/test_llm_analyzer_base.py +++ b/tests/nodes/test_llm_analyzer_base.py @@ -17,6 +17,7 @@ from __future__ import annotations +import os from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -31,6 +32,7 @@ findings_in_range, number_lines, ) +from skillspector.llm_utils import _resolve_max_concurrency from skillspector.models import Finding from skillspector.nodes.meta_analyzer import ( LLMMetaAnalyzer, @@ -1310,3 +1312,24 @@ async def always_fails(prompt: str) -> LLMAnalysisResult: with pytest.raises(Exception, match="429"): await analyzer.arun_batches([batch]) + + @pytest.mark.parametrize( + ("env", "expected"), + [ + ({}, 5), + ({"SKILLSPECTOR_MAX_CONCURRENCY": ""}, 5), + ({"SKILLSPECTOR_MAX_CONCURRENCY": " "}, 5), + ({"SKILLSPECTOR_MAX_CONCURRENCY": "auto"}, 5), + ({"SKILLSPECTOR_MAX_CONCURRENCY": "0"}, 1), + ({"SKILLSPECTOR_MAX_CONCURRENCY": "-3"}, 1), + ({"SKILLSPECTOR_MAX_CONCURRENCY": "8"}, 8), + ], + ) + def test_max_concurrency_env_parsing( + self, + env: dict[str, str], + expected: int, + ) -> None: + """Verify SKILLSPECTOR_MAX_CONCURRENCY defaults and clamps safely.""" + with patch.dict(os.environ, env, clear=True): + assert _resolve_max_concurrency() == expected