diff --git a/src/rotator_library/error_handler.py b/src/rotator_library/error_handler.py index 9fd252c7..f77d5c25 100644 --- a/src/rotator_library/error_handler.py +++ b/src/rotator_library/error_handler.py @@ -935,7 +935,13 @@ def classify_error(e: Exception, provider: Optional[str] = None) -> ClassifiedEr ) if isinstance( - e, (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError) + e, + ( + httpx.TimeoutException, + httpx.ConnectError, + httpx.NetworkError, + httpx.RemoteProtocolError, # peer closed connection without complete message + ), ): # [NEW] return ClassifiedError( error_type="api_connection", original_exception=e, status_code=status_code diff --git a/src/rotator_library/providers/iflow_provider.py b/src/rotator_library/providers/iflow_provider.py index ae292719..c3c06002 100644 --- a/src/rotator_library/providers/iflow_provider.py +++ b/src/rotator_library/providers/iflow_provider.py @@ -11,6 +11,7 @@ import os import httpx import logging +import asyncio from typing import Union, AsyncGenerator, List, Dict, Any, Optional from .provider_interface import ProviderInterface from .iflow_auth_base import IFlowAuthBase @@ -19,11 +20,38 @@ from ..timeout_config import TimeoutConfig from ..transaction_logger import ProviderLogger import litellm -from litellm.exceptions import RateLimitError, AuthenticationError +from litellm.exceptions import ( + RateLimitError, + AuthenticationError, +) from pathlib import Path import uuid from datetime import datetime +# Connection error types that should trigger retries +# These indicate transient network issues that may resolve on retry +CONNECTION_ERROR_TYPES = ( + httpx.RemoteProtocolError, # peer closed connection without complete message + httpx.ConnectError, # connection failed + httpx.ReadTimeout, # read timeout during streaming + httpx.TimeoutException, # base timeout (covers all timeout types) + httpx.NetworkError, # base network error (covers all network issues) +) + +# Context window error patterns to detect in error responses +CONTEXT_WINDOW_ERROR_PATTERNS = ( + "context_length", + "token limit", + "context window", + "too many tokens", + "too long", + "max_tokens", +) + +# Retry configuration +MAX_CONNECTION_RETRIES = 3 +RETRY_BACKOFF_BASE = 1.0 # Start with 1 second, then 2, 4 + lib_logger = logging.getLogger("rotator_library") @@ -936,6 +964,114 @@ def _stream_to_completion_response( return litellm.ModelResponse(**final_response_data) + def _get_usage_token_count(self, usage: Any, token_key: str) -> int: + """Extract usage token count from dict/object usage payloads.""" + if usage is None: + return 0 + + if isinstance(usage, dict): + value = usage.get(token_key, 0) + else: + value = getattr(usage, token_key, 0) + + try: + return int(value or 0) + except (TypeError, ValueError): + return 0 + + def _message_to_dict(self, message: Any) -> Dict[str, Any]: + """Normalize response message object to dict.""" + if message is None: + return {} + if isinstance(message, dict): + return message + if hasattr(message, "model_dump"): + return message.model_dump(exclude_none=False) + if hasattr(message, "dict"): + return message.dict() + if hasattr(message, "__dict__"): + return {k: v for k, v in message.__dict__.items() if not k.startswith("_")} + return {} + + def _raise_silent_context_failure( + self, + *, + model: str, + reason: str, + file_logger: ProviderLogger, + ) -> None: + """Raise a non-retryable context-window style error for silent 200 failures.""" + error_msg = f"iFlow silent context failure detected for {model}: {reason}" + file_logger.log_error(error_msg) + lib_logger.warning(error_msg) + request = httpx.Request("POST", "https://iflow.invalid/chat/completions") + response = httpx.Response( + status_code=400, + request=request, + text=f"context window exceeded: {error_msg}", + ) + raise httpx.HTTPStatusError( + f"Context window exceeded: {error_msg}", + request=request, + response=response, + ) + + def _validate_final_response( + self, + *, + final_response: litellm.ModelResponse, + model: str, + file_logger: ProviderLogger, + ) -> None: + """Detect empty/invalid 200 responses that indicate silent context failures.""" + choices = getattr(final_response, "choices", None) or [] + if not choices: + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 response had no choices", + file_logger=file_logger, + ) + + usage = getattr(final_response, "usage", None) + prompt_tokens = self._get_usage_token_count(usage, "prompt_tokens") + completion_tokens = self._get_usage_token_count(usage, "completion_tokens") + if prompt_tokens > 0 and completion_tokens == 0: + self._raise_silent_context_failure( + model=model, + reason=( + "completion_tokens=0 with non-zero prompt_tokens " + f"(prompt_tokens={prompt_tokens})" + ), + file_logger=file_logger, + ) + + first_choice = choices[0] + message_obj = ( + first_choice.get("message") + if isinstance(first_choice, dict) + else getattr(first_choice, "message", None) + ) + message = self._message_to_dict(message_obj) + + content = message.get("content") + reasoning_content = message.get("reasoning_content") + tool_calls = message.get("tool_calls") + function_call = message.get("function_call") + + has_content = isinstance(content, str) and bool(content.strip()) + has_reasoning = isinstance(reasoning_content, str) and bool( + reasoning_content.strip() + ) + has_tool_calls = isinstance(tool_calls, list) and len(tool_calls) > 0 + has_function_call = function_call not in (None, {}, []) + + if not (has_content or has_reasoning or has_tool_calls or has_function_call): + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 response completed with empty assistant message", + file_logger=file_logger, + ) + async def acompletion( self, client: httpx.AsyncClient, **kwargs ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: @@ -981,106 +1117,183 @@ async def make_request(): ) async def stream_handler(response_stream, attempt=1): - """Handles the streaming response and converts chunks.""" + """Handles the streaming response and converts chunks. + + Includes retry logic for transient connection errors with exponential backoff. + Connection errors (RemoteProtocolError, ConnectError, ReadTimeout, etc.) are + retried up to MAX_CONNECTION_RETRIES times before raising for higher-level handling. + """ # Track state across chunks for finish_reason normalization stream_state: Dict[str, Any] = {} - try: - async with response_stream as response: - # Check for HTTP errors before processing stream - if response.status_code >= 400: - error_text = await response.aread() - error_text = ( - error_text.decode("utf-8") - if isinstance(error_text, bytes) - else error_text - ) - # Handle 401: Force token refresh and retry once - if response.status_code == 401 and attempt == 1: - lib_logger.warning( - "iFlow returned 401. Forcing token refresh and retrying once." - ) - await self._refresh_token(credential_path, force=True) - retry_stream = await make_request() - async for chunk in stream_handler(retry_stream, attempt=2): - yield chunk - return - - # Handle 429: Rate limit - elif ( - response.status_code == 429 - or "slow_down" in error_text.lower() - ): - raise RateLimitError( - f"iFlow rate limit exceeded: {error_text}", - llm_provider="iflow", - model=model, - response=response, + # Connection error retry state + connection_retry_count = 0 + current_stream = response_stream + + while connection_retry_count <= MAX_CONNECTION_RETRIES: + try: + async with current_stream as response: + # Check for HTTP errors before processing stream + if response.status_code >= 400: + error_text = await response.aread() + error_text = ( + error_text.decode("utf-8") + if isinstance(error_text, bytes) + else error_text ) + error_text_lower = error_text.lower() - # Handle other errors - else: - if not error_text: - content_type = response.headers.get("content-type", "") - error_text = ( - f"(empty response body, content-type={content_type})" + # Handle 401: Force token refresh and retry once + if response.status_code == 401 and attempt == 1: + lib_logger.warning( + "iFlow returned 401. Forcing token refresh and retrying once." + ) + await self._refresh_token(credential_path, force=True) + retry_stream = await make_request() + async for chunk in stream_handler( + retry_stream, attempt=2 + ): + yield chunk + return + + # Handle 429: Rate limit + elif ( + response.status_code == 429 + or "slow_down" in error_text_lower + ): + raise RateLimitError( + f"iFlow rate limit exceeded: {error_text}", + llm_provider="iflow", + model=model, + response=response, ) - error_msg = ( - f"iFlow HTTP {response.status_code} error: {error_text}" - ) - file_logger.log_error(error_msg) - raise httpx.HTTPStatusError( - f"HTTP {response.status_code}: {error_text}", - request=response.request, - response=response, - ) - - # Process successful streaming response - async for line in response.aiter_lines(): - file_logger.log_response_chunk(line) - # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space) - if line.startswith("data:"): - # Extract data after "data:" prefix, handling both formats - if line.startswith("data: "): - data_str = line[6:] # Skip "data: " + # Handle other errors else: - data_str = line[5:] # Skip "data:" - - if data_str.strip() == "[DONE]": - # lib_logger.debug("iFlow: Received [DONE] marker") - break - try: - chunk = json.loads(data_str) + if not error_text: + content_type = response.headers.get( + "content-type", "" + ) + error_text = f"(empty response body, content-type={content_type})" + error_text_lower = error_text.lower() - for openai_chunk in self._convert_chunk_to_openai( - chunk, model, stream_state + # Detect context window errors from response body + if any( + pattern in error_text_lower + for pattern in CONTEXT_WINDOW_ERROR_PATTERNS ): - yield litellm.ModelResponse(**openai_chunk) - except json.JSONDecodeError: - lib_logger.warning( - f"Could not decode JSON from iFlow: {line}" + error_msg = ( + f"iFlow context window exceeded: {error_text}" + ) + file_logger.log_error(error_msg) + lib_logger.warning( + f"iFlow context window error detected: {error_text}" + ) + raise httpx.HTTPStatusError( + f"Context window exceeded: {error_text}", + request=response.request, + response=response, + ) + + error_msg = f"iFlow HTTP {response.status_code} error: {error_text}" + file_logger.log_error(error_msg) + raise httpx.HTTPStatusError( + f"HTTP {response.status_code}: {error_text}", + request=response.request, + response=response, ) - except httpx.HTTPStatusError: - raise # Re-raise HTTP errors we already handled - except Exception as e: - file_logger.log_error(f"Error during iFlow stream processing: {e}") - lib_logger.error( - f"Error during iFlow stream processing: {e}", exc_info=True - ) - raise + # Process successful streaming response + async for line in response.aiter_lines(): + file_logger.log_response_chunk(line) + + # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space) + if line.startswith("data:"): + # Extract data after "data:" prefix, handling both formats + if line.startswith("data: "): + data_str = line[6:] # Skip "data: " + else: + data_str = line[5:] # Skip "data:" + + if data_str.strip() == "[DONE]": + # lib_logger.debug("iFlow: Received [DONE] marker") + break + try: + chunk = json.loads(data_str) + + for openai_chunk in self._convert_chunk_to_openai( + chunk, model, stream_state + ): + yield litellm.ModelResponse(**openai_chunk) + except json.JSONDecodeError: + lib_logger.warning( + f"Could not decode JSON from iFlow: {line}" + ) + + # Successfully completed without errors, exit retry loop + return + + except httpx.HTTPStatusError: + raise # Re-raise HTTP errors we already handled + + except CONNECTION_ERROR_TYPES as e: + connection_retry_count += 1 + error_type_name = type(e).__name__ + + if connection_retry_count > MAX_CONNECTION_RETRIES: + # Max retries exhausted, log and re-raise for higher-level handling + error_msg = ( + f"iFlow connection error after {MAX_CONNECTION_RETRIES} retries: " + f"{error_type_name}: {e}" + ) + file_logger.log_error(error_msg) + lib_logger.error(error_msg) + raise + + # Exponential backoff: 1s, 2s, 4s + backoff = RETRY_BACKOFF_BASE * (2 ** (connection_retry_count - 1)) + lib_logger.warning( + f"iFlow connection error ({error_type_name}) on attempt " + f"{connection_retry_count}/{MAX_CONNECTION_RETRIES}: {e}. " + f"Retrying in {backoff}s..." + ) + await asyncio.sleep(backoff) + + # Re-create the stream for retry + current_stream = await make_request() + # Continue the while loop to retry + + except Exception as e: + file_logger.log_error(f"Error during iFlow stream processing: {e}") + lib_logger.error( + f"Error during iFlow stream processing: {e}", exc_info=True + ) + raise async def logging_stream_wrapper(): """Wraps the stream to log the final reassembled response and cache reasoning.""" openai_chunks = [] + stream_completed = False try: async for chunk in stream_handler(await make_request()): openai_chunks.append(chunk) yield chunk + stream_completed = True finally: - if openai_chunks: + if stream_completed: + if not openai_chunks: + self._raise_silent_context_failure( + model=model, + reason="HTTP 200 stream ended without any data chunks", + file_logger=file_logger, + ) + final_response = self._stream_to_completion_response(openai_chunks) + self._validate_final_response( + final_response=final_response, + model=model, + file_logger=file_logger, + ) file_logger.log_final_response(final_response.dict()) # Store reasoning_content from the response for future multi-turn conversations