-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathretry_agent.py
More file actions
151 lines (128 loc) · 5.54 KB
/
retry_agent.py
File metadata and controls
151 lines (128 loc) · 5.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""Retry model example with custom retry configuration.
This example demonstrates how to wrap an LLM model with RetryModel to add
automatic retry logic with exponential backoff for transient failures.
The RetryModel:
- Wraps any LLMModel (e.g., OpenAIModel, FakeModel)
- Retries non-streaming completion calls on network/HTTP errors
- Uses exponential backoff with configurable parameters (max_attempts, min_wait, max_wait)
- Skips retry logic for streaming calls (passes through directly)
- Logs retry attempts with structured fields (attempt number, error, wait time)
Custom RetryConfig allows fine-tuning:
- max_attempts: Maximum number of attempts (default: 3)
- multiplier: Exponential backoff multiplier (default: 1.0)
- min_wait: Minimum wait time between retries in seconds (default: 4.0)
- max_wait: Maximum wait time between retries in seconds (default: 60.0)
- retry_status_codes: HTTP status codes to retry on (default: 429, 500, 502, 503, 504)
Usage:
1. Copy .env.example to .env and fill in your API credentials
2. Run: uv run python examples/retry_agent.py
Environment variables:
LLM_API_KEY — API key for the LLM model (required)
LLM_BASE_URL — Base URL for the API (default: https://dashscope.aliyuncs.com/compatible-mode/v1)
LLM_MODEL — Model name (default: qwen3.5-plus)
"""
from __future__ import annotations
import asyncio
import os
import sys
from ecs_agent.logging import configure_logging, get_logger
from ecs_agent.providers import FakeModel, Model
from ecs_agent.providers.config import ApiFormat
from ecs_agent.providers.retry_model import RetryModel
from ecs_agent.types import CompletionResult, Message, RetryConfig, Usage
logger = get_logger(__name__)
async def main() -> None:
"""Run an agent with retry-wrapped model."""
# --- Configure logging ---
configure_logging(json_output=False)
# --- Load config from environment ---
api_key = os.environ.get("LLM_API_KEY", "")
base_url = os.environ.get(
"LLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"
)
model = os.environ.get("LLM_MODEL", "qwen3.5-plus")
# --- Create base model ---
if api_key:
print(f"Using model: {model}")
print(f"Base URL: {base_url}")
base_model = Model(model, base_url=base_url, api_key=api_key, api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS)
else:
print("No LLM_API_KEY provided. Using FakeModel for demonstration.")
# Create a fake model with a realistic response
fake_result = CompletionResult(
message=Message(
role="assistant",
content="This is a demonstration response from the retry-wrapped model. "
"In production, the RetryModel would transparently retry "
"on transient errors like rate limits (429) and server errors (500, 502, 503, 504).",
),
usage=Usage(
prompt_tokens=15,
completion_tokens=45,
total_tokens=60,
),
)
base_model = FakeModel(responses=[fake_result])
# --- Create custom retry configuration ---
# This demonstrates custom retry parameters beyond the defaults
retry_config = RetryConfig(
max_attempts=5, # Allow up to 5 attempts instead of default 3
multiplier=2.0, # Use 2x exponential backoff instead of 1x
min_wait=2.0, # Start with 2 seconds instead of 4
max_wait=30.0, # Cap at 30 seconds instead of 60
retry_status_codes=(429, 500, 502, 503, 504), # Retry on these HTTP errors
)
print()
print("Retry Configuration:")
print(f" max_attempts: {retry_config.max_attempts}")
print(f" multiplier: {retry_config.multiplier}")
print(f" min_wait: {retry_config.min_wait}s")
print(f" max_wait: {retry_config.max_wait}s")
print(f" retry_status_codes: {retry_config.retry_status_codes}")
print()
# --- Wrap model with retry logic ---
model = RetryModel(base_model, retry_config=retry_config)
# --- Make a completion request ---
messages = [
Message(
role="user",
content="Explain how the RetryModel works with exponential backoff.",
)
]
print("Making completion request through retry-wrapped model...")
print()
try:
result = await model.complete(messages=messages)
print("Completion Result:")
print(f" Role: {result.message.role}")
print(f" Content: {result.message.content}")
if result.usage:
print(
f" Tokens - Prompt: {result.usage.prompt_tokens}, "
f"Completion: {result.usage.completion_tokens}, "
f"Total: {result.usage.total_tokens}"
)
print()
print("✓ Completion succeeded (no retries needed for this request)")
except Exception as e:
logger.error(
"completion_failed",
error=str(e),
error_type=type(e).__name__,
)
print(f"✗ Completion failed with error: {e}")
sys.exit(1)
# --- Demonstrate retry behavior in logging ---
print()
print("Notes:")
print("- If this request had encountered transient errors (429, 500, etc.),")
print(
" the RetryModel would have automatically retried with exponential backoff"
)
print("- Retry attempts are logged at WARNING level with structured fields:")
print(" {attempt, error, wait_seconds}")
print(
"- Streaming calls bypass retry logic and are passed directly to the base model"
)
if __name__ == "__main__":
asyncio.run(main())