|
| 1 | +""" |
| 2 | +Exception handling configuration for rollout processors with backoff retry logic. |
| 3 | +""" |
| 4 | + |
| 5 | +import os |
| 6 | +from dataclasses import dataclass, field |
| 7 | +from typing import Callable, Set, Type, Union |
| 8 | + |
| 9 | +import backoff |
| 10 | + |
| 11 | + |
| 12 | +import requests |
| 13 | +import httpx |
| 14 | + |
| 15 | +# Default exceptions that should be retried with backoff |
| 16 | +DEFAULT_RETRYABLE_EXCEPTIONS: Set[Type[Exception]] = { |
| 17 | + # Standard library exceptions |
| 18 | + ConnectionError, |
| 19 | + TimeoutError, |
| 20 | + OSError, # Covers network-related OS errors |
| 21 | + # Requests library exceptions |
| 22 | + requests.exceptions.ConnectionError, |
| 23 | + requests.exceptions.Timeout, |
| 24 | + requests.exceptions.HTTPError, |
| 25 | + requests.exceptions.RequestException, |
| 26 | + # HTTPX library exceptions |
| 27 | + httpx.ConnectError, |
| 28 | + httpx.TimeoutException, |
| 29 | + httpx.NetworkError, |
| 30 | + httpx.RemoteProtocolError, |
| 31 | +} |
| 32 | + |
| 33 | + |
| 34 | +@dataclass |
| 35 | +class BackoffConfig: |
| 36 | + """Configuration for backoff behavior.""" |
| 37 | + |
| 38 | + # Backoff strategy: 'expo' for exponential, 'constant' for constant delay |
| 39 | + strategy: str = "expo" |
| 40 | + |
| 41 | + # Base delay in seconds |
| 42 | + base_delay: float = 1.0 |
| 43 | + |
| 44 | + # Maximum delay in seconds |
| 45 | + max_delay: float = 60.0 |
| 46 | + |
| 47 | + # Maximum number of retry attempts |
| 48 | + max_tries: int = 3 |
| 49 | + |
| 50 | + # Jitter: adds randomness to backoff delays (None = no jitter for predictable timing) |
| 51 | + jitter: Union[None, Callable] = None |
| 52 | + |
| 53 | + # Factor for exponential backoff (only used if strategy == 'expo') |
| 54 | + factor: float = 2.0 |
| 55 | + |
| 56 | + # Whether to raise the exception when giving up (instead of returning it) |
| 57 | + raise_on_giveup: bool = True |
| 58 | + |
| 59 | + # Optional custom giveup function - if provided, overrides the default exception handling logic |
| 60 | + giveup_func: Callable[[Exception], bool] = lambda e: False |
| 61 | + |
| 62 | + def get_backoff_decorator(self, exceptions: Set[Type[Exception]]): |
| 63 | + """Get the appropriate backoff decorator based on configuration.""" |
| 64 | + if not exceptions: |
| 65 | + # If no exceptions specified, return a no-op decorator |
| 66 | + def no_op_decorator(func): |
| 67 | + return func |
| 68 | + |
| 69 | + return no_op_decorator |
| 70 | + |
| 71 | + if self.strategy == "expo": |
| 72 | + return backoff.on_exception( |
| 73 | + backoff.expo, |
| 74 | + tuple(exceptions), |
| 75 | + max_tries=self.max_tries, |
| 76 | + base=self.base_delay, |
| 77 | + max_value=self.max_delay, |
| 78 | + factor=self.factor, |
| 79 | + jitter=self.jitter, |
| 80 | + giveup=self.giveup_func, |
| 81 | + raise_on_giveup=self.raise_on_giveup, |
| 82 | + ) |
| 83 | + elif self.strategy == "constant": |
| 84 | + return backoff.on_exception( |
| 85 | + backoff.constant, |
| 86 | + tuple(exceptions), |
| 87 | + max_tries=self.max_tries, |
| 88 | + interval=self.base_delay, |
| 89 | + jitter=self.jitter, |
| 90 | + giveup=self.giveup_func, |
| 91 | + raise_on_giveup=self.raise_on_giveup, |
| 92 | + ) |
| 93 | + else: |
| 94 | + raise ValueError(f"Unknown backoff strategy: {self.strategy}") |
| 95 | + |
| 96 | + |
| 97 | +@dataclass |
| 98 | +class ExceptionHandlerConfig: |
| 99 | + """Configuration for exception handling in rollout processors.""" |
| 100 | + |
| 101 | + # Exceptions that should be retried using backoff |
| 102 | + retryable_exceptions: Set[Type[Exception]] = field(default_factory=lambda: DEFAULT_RETRYABLE_EXCEPTIONS.copy()) |
| 103 | + |
| 104 | + # Backoff configuration |
| 105 | + backoff_config: BackoffConfig = field(default_factory=BackoffConfig) |
| 106 | + |
| 107 | + def __post_init__(self): |
| 108 | + """Automatically apply environment variable overrides after initialization.""" |
| 109 | + # Override backoff settings from environment variables |
| 110 | + if "EP_MAX_RETRY" in os.environ: |
| 111 | + max_retry = int(os.environ["EP_MAX_RETRY"]) |
| 112 | + if max_retry > 0: |
| 113 | + self.backoff_config.max_tries = max_retry |
| 114 | + |
| 115 | + if "EP_FAIL_ON_MAX_RETRY" in os.environ: |
| 116 | + fail_on_max_retry = os.environ["EP_FAIL_ON_MAX_RETRY"].lower() |
| 117 | + self.backoff_config.raise_on_giveup = fail_on_max_retry != "false" |
| 118 | + |
| 119 | + def get_backoff_decorator(self): |
| 120 | + """Get the backoff decorator configured for this exception handler.""" |
| 121 | + return self.backoff_config.get_backoff_decorator(self.retryable_exceptions) |
| 122 | + |
| 123 | + |
| 124 | +def get_default_exception_handler_config() -> ExceptionHandlerConfig: |
| 125 | + """Get a fresh default exception handler configuration.""" |
| 126 | + return ExceptionHandlerConfig() |
0 commit comments