-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_disque.py
More file actions
71 lines (59 loc) · 2.12 KB
/
test_disque.py
File metadata and controls
71 lines (59 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import argparse,os,sys,time,subprocess
import logging
import utils
import sys
sys.path.insert(0, './lib/pydisque')
import pydisque.client
logger = logging.getLogger(__name__)
class DisqueTester(utils.QueueTester):
server_process = None
def start_server(self):
stderr = open('/dev/null', 'w') if self.options.loglevel == logging.WARN else None
self.server_process = subprocess.Popen(
['./servers/disque/src/disque-server', '--loglevel', 'warning'],
stdout=stderr,
stderr=stderr
)
timeout = time.time() + 10
while time.time() < timeout:
try:
client = pydisque.client.Client()
client.connect()
client.info()
return
except Exception as error:
# logger.warn("Error connecting to disque: %s", error)
time.sleep(1)
self.server_process.kill()
raise OSError("Disque not answering after 10 seconds")
def stop_server(self):
if not self.server_process: return
self.server_process.terminate()
status = self.server_process.wait()
if status != 0:
logger.warn("disque exited with %d", status)
def connect(self, queues_to_watch=None):
self.client = pydisque.client.Client()
self.client.connect()
self.queues_to_watch = queues_to_watch
def send(self, queue, message):
self.client.add_job(queue, message)
class Job(object):
def __init__(self, queue, id, payload, client):
self.queue = queue
self.body = payload
self.id = id
self.client = client
def done(self):
return self.client.ack_job(self.id)
def requeue(self):
return self.client.nack(self.id)
def recv(self, timeout=0):
jobs = self.client.get_job(self.queues_to_watch, timeout=10)
if not jobs:
return None
queue, id_, payload = jobs[0]
job = self.Job(queue, id_, payload, self.client)
return job
if __name__ == '__main__':
DisqueTester.main()