Skip to content

Commit 77ef672

Browse files
Tests and Documentation Updates (#2)
* Retry only on 429 and 500. Additional tests and documentation * Update version * Change client-server timeout padding
1 parent b891abe commit 77ef672

11 files changed

Lines changed: 320 additions & 64 deletions

File tree

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ from reach import ReachClient
2929
```
3030
2) Pass your [API Key](https://app.versium.com/account/manage-api-keys) to the ReachClient constructor.
3131
```python
32-
client = ReachClient('path-key-012345678')
32+
client = ReachClient('api-key-012345678')
3333
```
3434
3) Run the `append` method of your `ReachClient` object with the API name, input records, desired outputs (if applicable),
3535
and any extra config parameters you wish to pass.
@@ -81,6 +81,9 @@ Results are returned as a list of QueryResult objects, which contain the followi
8181
If the client errored out during a request, this stores the error object
8282

8383

84+
- **error_msg**:
85+
Stores additional info about query errors.
86+
8487
# Things to keep in mind
8588
- The default rate limit for Reach APIs is 20 queries per second
8689
- You must have a provisioned API key for this function to work. If you are unsure where to find your API key,

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "versium-reach-sdk"
7-
version = "1.0.0"
7+
version = "1.1.0"
88
authors = [
99
{ name="Versium Analytics, Inc.", email="opensource@versium.com" },
1010
]

reach/append.py

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,33 @@
1414
API_VERSION = "/v2/"
1515

1616

17-
async def _fetch(session, record, query_params, path, headers, attempts_left):
18-
"""Internal fetch method."""
17+
async def _fetch(session, record, query_params, path, headers):
18+
"""Make an HTTP request to the API
19+
20+
Parameters
21+
----------
22+
session : aiohttp.ClientSession
23+
24+
record : QueryRecord
25+
26+
query_params : dict
27+
Additional query parameters to pass to the API call
28+
29+
path : string
30+
Full path of the Versium Reach API endpoint
31+
32+
headers : dict
33+
Additional headers to pass with the HTTP request.
34+
35+
Returns
36+
-------
37+
QueryResult
38+
"""
1939
if query_params is None:
2040
query_params = {}
2141
row_dict = {key: value for key, value in record.data.items() if value is not None}
2242
idx = record.index
43+
err_msg = ""
2344
result = QueryResult()
2445

2546
params = {**query_params, **row_dict}
@@ -32,8 +53,9 @@ async def _fetch(session, record, query_params, path, headers, attempts_left):
3253
result.headers = dict(response.headers)
3354

3455
if not result.success:
35-
logger.error(f"Unsuccessful url fetch: {result.reason}\n\tIndex: {idx}\n\tURL: {API_BASE_URL + path}?{urllib.parse.urlencode(params)}"
36-
f"\n\tResponse Status: {result.http_status}\n\tAttempts Left: {attempts_left:d}")
56+
err_msg = f"Unsuccessful url fetch: {result.reason}\n\tIndex: {idx}\n\tURL: {API_BASE_URL + path}?{urllib.parse.urlencode(params)}"\
57+
f"\n\tResponse Status: {result.http_status}"
58+
result.error_msg = err_msg
3759
return result
3860

3961
result.body_raw = await response.read()
@@ -51,13 +73,50 @@ async def _fetch(session, record, query_params, path, headers, attempts_left):
5173
except aiohttp.ClientError as e:
5274
result.request_error = e
5375
status = getattr(response, "status", "UNKNOWN")
54-
logger.error(f"Error during url fetch: {e.message}\n\tIndex: {idx}\n\tURL: {path}?{urllib.parse.urlencode(params)}"
55-
f"\n\tResponse Status: {status}\n\tAttempts Left: {attempts_left:d}")
76+
err_msg = f"Error during url fetch: {e.message}\n\tIndex: {idx}\n\tURL: {path}?{urllib.parse.urlencode(params)}"\
77+
f"\n\tResponse Status: {status}"
78+
result.error_msg = err_msg
5679
return result
5780

5881

59-
async def _create_tasks(api, records, query_params, headers=None, *, queries_per_second=20, n_connections=100, timeout=20, n_retry=3,
60-
retry_wait_time=3):
82+
async def _create_tasks(api, records, query_params, headers=None, *, n_retry=3, queries_per_second=20, n_connections=100, retry_wait_time=3,
83+
timeout=20):
84+
""" Split the API calls into asynchronous tasks and wrap them in a rate limiter.
85+
86+
Parameters
87+
----------
88+
api : string
89+
Specifies the name of the Versium Reach API endpoint to query ('contact', 'demographic', 'b2conlineaudience', etc.)
90+
91+
records : list[QueryRecord]
92+
List containing QueryRecord objects
93+
94+
query_params : dict
95+
Additional query parameters to pass to each API call (e.g. {'cfg_max_recs': 1})
96+
97+
headers : dict
98+
Additional header parameters to pass to the API call.
99+
100+
n_retry : int
101+
Number of times to retry the query if it fails.
102+
103+
queries_per_second : int
104+
Maximum number of queries to perform each second to avoid 429 errors.
105+
106+
n_connections : int
107+
Number of simultaneous calls to make when querying.
108+
109+
retry_wait_time : int
110+
Number of seconds to wait until retrying a failed query. The wait time is increased by a multiple of `retry_wait_time` every time
111+
the query fails (e.g. 0, 3, 6, 9, 12, etc.)
112+
113+
timeout : float
114+
Number of seconds to wait for the response before timing out.
115+
116+
Returns
117+
-------
118+
list[dict]: List of responses from the API calls. This will be in the same order as given in the input.
119+
"""
61120
tasks = []
62121
limit = RateLimiter(max_calls=queries_per_second,
63122
period=1,
@@ -78,6 +137,42 @@ async def _create_tasks(api, records, query_params, headers=None, *, queries_per
78137

79138
def query_api(api, records, query_params, headers=None, *, n_retry=3, queries_per_second=20, n_connections=3, retry_wait_time=3,
80139
timeout=3):
140+
""" Query the Versium Reach API and return the results.
141+
142+
Parameters
143+
----------
144+
api : string
145+
Specifies the name of the Versium Reach API endpoint to query ('contact', 'demographic', 'b2conlineaudience', etc.)
146+
147+
records : list[dict]
148+
List containing records as key, value pairs e.g [{'first': 'John', 'last': 'Smith'}]
149+
150+
query_params : dict
151+
Additional query parameters to pass to each API call (e.g. {'cfg_max_recs': 1})
152+
153+
headers : dict
154+
Additional header parameters to pass to the API call.
155+
156+
n_retry : int
157+
Number of times to retry the query if it fails.
158+
159+
queries_per_second : int
160+
Maximum number of queries to perform each second to avoid 429 errors.
161+
162+
n_connections : int
163+
Number of simultaneous calls to make when querying.
164+
165+
retry_wait_time : int
166+
Number of seconds to wait until retrying a failed query. The wait time is increased by a multiple of `retry_wait_time` every time
167+
the query fails (e.g. 0, 3, 6, 9, 12, etc.)
168+
169+
timeout : float
170+
Number of seconds to wait for the response before timing out.
171+
172+
Returns
173+
-------
174+
list[dict]: List of responses from the API calls. This will be in the same order as given in the input.
175+
"""
81176

82177
if len(records) < 1:
83178
logger.warning("No input records were given.")

reach/query_data.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ class QueryResult:
4444
4545
request_error: aiohttp.ClientError
4646
If the client errored out during a request, this stores the error object
47+
48+
error_msg: string
49+
Additional error message
4750
"""
4851

4952
def __init__(self, body=None, success=False, match_found=False, *, http_status=None, reason=None, headers=None,
50-
body_raw=None, request_error=None):
53+
body_raw=None, request_error=None, error_msg=""):
5154
if body is None:
5255
body = {}
5356
self.body = body
@@ -59,6 +62,7 @@ def __init__(self, body=None, success=False, match_found=False, *, http_status=N
5962
self.body_raw = body_raw
6063
self.request_error = request_error
6164
self.reason = reason
65+
self.error_msg = error_msg
6266

6367
def __repr__(self):
6468
headers = str(self.headers)

reach/rate_limiter.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import asyncio
2+
import logging
23
import time
34

5+
logger = logging.getLogger(__name__)
6+
47

58
class RateLimiter(object):
69
""" Limits the number of calls to a function within a timeframe. Also limits the number of total active function calls.
@@ -34,6 +37,17 @@ def __init__(self, *, max_calls=20, period=1, n_connections=100, n_retry=3, retr
3437
self.sem = asyncio.Semaphore(n_connections)
3538

3639
def __call__(self, func):
40+
"""
41+
42+
Parameters
43+
----------
44+
func : Callable
45+
function that returns a QueryResult object
46+
47+
Returns
48+
-------
49+
Callable: Input function wrapped with a rate limiting functionality
50+
"""
3751

3852
async def wrapper(*args, **kwargs):
3953
# Semaphore will block more than {self.max_connections} from happening at once.
@@ -44,16 +58,32 @@ async def wrapper(*args, **kwargs):
4458
await asyncio.sleep(self.__period_remaining())
4559

4660
self.num_calls += 1
47-
result = await func(*args, attempts_left=self.n_retry - i, **kwargs)
48-
if not result.success and self.n_retry - i > 0:
49-
await asyncio.sleep(self.retry_wait_time * i)
61+
result = await func(*args, **kwargs)
62+
if result.success:
63+
return result
5064

65+
if (result.http_status in (429, 500)) and (self.n_retry - i > 0):
66+
logger.error(result.error_msg + f"\n\tAttempts Left: {self.n_retry - i: d}")
67+
await asyncio.sleep(self.retry_wait_time * i)
68+
continue
69+
elif self.n_retry - i <= 0:
70+
logger.error(result.error_msg + f"\n\tNo attempts left.")
5171
else:
52-
return result
72+
logger.error(result.error_msg + f"\n\tNot retrying for http status: {result.http_status}")
73+
74+
return result
5375

5476
return wrapper
5577

5678
def __period_remaining(self):
79+
""" Gets the amount of time remaining in the period. If there is no time remaining, resets the call counter and updates the reset
80+
timer.
81+
82+
Returns
83+
-------
84+
float: Amount of time remaining in this period
85+
86+
"""
5787
elapsed = self.clock() - self.last_reset
5888
period_remaining = self.period - elapsed
5989
if period_remaining <= 0:

reach/reach.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from .append import query_api
22
import logging
33

4+
CLIENT_SERVER_TIMEOUT_PADDING = 0.2
5+
46
logger = logging.getLogger(__name__)
57

68

@@ -40,6 +42,9 @@ def __init__(self, api_key, *, queries_per_second=20, n_connections=100, timeout
4042
self.n_retry = n_retry
4143
self.retry_wait_time = retry_wait_time
4244

45+
if timeout <= 0:
46+
raise ValueError(f"`timeout` must be greater than 0! Instead got {timeout}.")
47+
4348
def append(self, api_name, input_records, outputs=(), config_params=None):
4449
"""Perform an append on the input records and return the results.
4550
@@ -63,11 +68,13 @@ def append(self, api_name, input_records, outputs=(), config_params=None):
6368
-------
6469
list[QueryResult]: A list of QueryResult objects
6570
"""
66-
query_params = dict()
71+
query_params = {"cfg_max_recs": 1}
6772
if config_params is None:
6873
config_params = dict()
6974

7075
query_params.update(config_params)
76+
# Pad the server-side timeout to be slightly less than client to get a response back.
77+
query_params['rcfg_max_time'] = self.timeout - min(self.timeout/2.0, CLIENT_SERVER_TIMEOUT_PADDING)
7178

7279
query_params["output[]"] = list(set(outputs)) # remove duplicate outputs
7380

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def setup_package():
4949

5050
setup(
5151
name='versium-reach-sdk',
52-
version='1.0.0',
52+
version='1.1.0',
5353
description='Python SDK for querying Versium Reach APIs',
5454
long_description=long_description,
5555
url='https://github.com/VersiumAnalytics/reach-api-python-sdk',

tests/base.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import logging
2+
from unittest.mock import patch
3+
4+
from aiohttp.test_utils import AioHTTPTestCase, TestServer, Application
5+
6+
from reach import append
7+
from tests.utils import make_app, RequestHandler, RateChecker
8+
9+
10+
class BaseTestCase(AioHTTPTestCase):
11+
12+
async def get_application(self):
13+
"""
14+
Override the get_app method to return your application.
15+
"""
16+
self.rate_checker = RateChecker(max_calls=5, max_connections=5, min_calls=1, min_connections=1, period=1)
17+
self.request_handler = RequestHandler(self.rate_checker, response_time=0)
18+
return make_app(self.request_handler)
19+
20+
async def get_server(self, app: Application) -> TestServer:
21+
"""Return a TestServer instance."""
22+
test_server = TestServer(app, loop=self.loop, skip_url_asserts=False)
23+
return test_server
24+
25+
async def setUpAsync(self):
26+
await super().setUpAsync()
27+
patcher = patch.object(append.aiohttp, 'ClientSession', autospec=True)
28+
self.addCleanup(patcher.stop)
29+
self.ClientSession = patcher.start()
30+
self.ClientSession.return_value = self.client
31+
32+
# Silence aiohttp logs
33+
aiohttp_logs = ['aiohttp.access',
34+
'aiohttp.client',
35+
'aiohttp.internal',
36+
'aiohttp.server',
37+
'aiohttp.web',
38+
'aiohttp.websocket']
39+
for log_name in aiohttp_logs:
40+
logging.getLogger(log_name).handlers = []

0 commit comments

Comments
 (0)