-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathsloq.py
More file actions
164 lines (139 loc) · 5.71 KB
/
sloq.py
File metadata and controls
164 lines (139 loc) · 5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from Queue import Queue
from threading import Lock
import time
class TokenBucket(object):
"""TokenBucket provides a count method which offers the number of
time-limited operations that you are able to perform and keeps track of
this over time. Basing operations off of this allows you to determine the
number of...
The tokens will not begin accruing if you pass start=False, after which
you will need to reset() the bucket to begin. Doing so allows you to
specify the number of initial tokens available.
tick:
The interval in seconds between releasing new tokens
start:
Whether to begin ticking now
max_slam:
The maximum number of tokens that can be queued up simultaneously.
Having two tokens will allow two tokens to be taken at the same time
and two events to happen immediately, regardless of tick.
"""
def __init__(self, tick, start=True, max_slam=None):
if tick <= 0:
raise ValueError("TokenBucket tick must be greater than 0")
self.take_lock = Lock()
self.count_lock = Lock()
self.max_slam = float(max_slam) if max_slam else float('infinity')
self.tick = tick
self.last_tick = None
if start:
self.reset()
def reset(self, tokens=0):
self._tokens = float(tokens)
self.last_tick = self.now()
def now(self):
"""now returns a representation of the current time in seconds. Ideally
it should contain partial seconds too.
"""
return time.time()
def count(self):
"""count returns the number of tokens available in the bucket and is
responsible for accuring further tokens.
"""
with self.count_lock:
if not self.last_tick:
raise RuntimeError("The TokenBucket has not been started. "
"Call reset() to start manually or pass "
"start=True in the constructor.")
# Elapsed time
now = self.now()
time_delta = now - self.last_tick
self.last_tick = now
# Tokens accrued
self._tokens = min(
self.max_slam,
self._tokens + time_delta / self.tick
)
return self._tokens
def take(self, n=1, block=False, timeout=None):
"""take removes tokens from the bucket if there is sufficient amount.
Returns true when the number of tokens have been taken, otherwise false
n:
The number of tokens to remove.
block:
Whether to sleep until a token next becomes available.
timeout:
The amount of time in seconds we're willing to sleep before a
token becomes available. If we can determine in advance that no
token will be available in the amount of time it takes for a token
to become available we will fail early.
"""
with self.take_lock:
while True:
n, count = float(n), self.count()
if n <= count:
self._tokens -= n
return True
elif block:
wait = 1 - (self._tokens % 1.0)
if timeout:
if wait > timeout - 0.01:
return False
timeout -= wait
time.sleep(wait)
else:
return False
class SlowQueue(object):
"""SlowQueue is a wrapper around Queue.*Queue implementations which blocks
for an amount of time to satisfy a certain release rate. The SlowQueue
enforces blocking.
queue:
The optional Queue responsible for tracking the items in play. If you
do not specify a queue of your own we will assume the FIFO Queue.Queue
and use maxsize.
maxsize:
The maxsize given to the default Queue when we need to create our own.
token_bucket:
The optional TokenBucket responsible for locking for periods of time.
If not provided we'll use a slowqueue.TokenBucket with release_tick.
release_tick:
The seconds between releasing items from the queue. Only used where a
token_bucket is not provided.
"""
def __init__(self, queue=None, maxsize=0, release_tick=None,
token_bucket=None, max_slam=None):
if token_bucket:
self.token_bucket = token_bucket
self.token_bucket.count() # Ensure the accural has started
elif release_tick:
self.token_bucket = TokenBucket(release_tick, max_slam=max_slam)
else:
raise TypeError("SlowQueue requires a release_tick float or "
"token_bucket TokenBucket instance")
self.queue = queue or Queue(maxsize=maxsize)
def reset_tokens(self, tokens=0):
self.token_bucket.reset(tokens=tokens)
def get(self, block=True, timeout=0):
if not block or timeout != 0:
raise ValueError(
"SlowQueue works only with block=True and timeout=0"
)
item = self.queue.get(block=True)
self.token_bucket.take(block=True)
return item
def get_nowait(self):
return self.get(block=False)
def qsize(self):
return self.queue.qsize()
def empty(self):
return self.queue.empty()
def full(self):
return self.queue.full()
def put(self, item, block=True, timeout=0):
return self.queue.put(item, block=block, timeout=timeout)
def put_nowait(self, item):
return self.put(item, block=False)
def task_done(self):
return self.queue.task_done()
def join(self):
return self.queue.join()