Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a4fdde0
add benchmarking stuff
fijal Mar 15, 2016
c461f89
Add checking for 200 replies
Mar 15, 2016
3826fef
resolve conflict
fijal Mar 15, 2016
4141ae4
undo the breakage
fijal Mar 15, 2016
7bd7e67
undo more breakage
fijal Mar 15, 2016
eabcdde
add some data points
fijal Mar 15, 2016
0bb796d
Merge branch 'develop' into feature/issue-82-benchmarking
Mar 15, 2016
59245b4
work on concurrency
fijal Mar 15, 2016
8ff1487
improve the reporting
fijal Mar 16, 2016
cc9a69e
Add benchmark running script
Mar 16, 2016
44fac4a
Some SMPP benchmark things.
Mar 16, 2016
d621a7d
always close the socket
fijal Mar 17, 2016
4ee2c02
improve the smpp benchmark
fijal Mar 17, 2016
34ca798
add max_rss
fijal Mar 17, 2016
7cbe56e
convert
fijal Mar 17, 2016
3292fe3
fixes
fijal Mar 17, 2016
9afef32
Combine ussd and smpp benchmark runners into one
Mar 18, 2016
4a8e668
Merge branch 'develop' into feature/issue-82-benchmarking
Apr 25, 2016
03a353c
Merge branch 'develop' into MOMZA-847-add-routing-to-benchmark
erikh360 Jan 15, 2018
cdc0e66
updating benchmark
erikh360 Jan 16, 2018
a4b0c5c
Merge branch 'develop' into MOMZA-847-add-routing-to-benchmark
erikh360 Jan 18, 2018
14c1112
update benchmarking
erikh360 Jan 19, 2018
102989f
updates
erikh360 Jan 19, 2018
a7a3d14
remove docker-compose
erikh360 Jan 19, 2018
5276b11
Merge branch 'develop' into MOMZA-847-add-routing-to-benchmark
erikh360 Jan 24, 2018
c11db12
add router type
erikh360 Jan 24, 2018
205aaaf
add extra destinations
erikh360 Jan 25, 2018
f1c07d0
Merge pull request #168 from praekelt/MOMZA-847-add-routing-to-benchmark
erikh360 Jan 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ _trial_temp
.cache/
.coverage
ve/
.DS_Store
74 changes: 74 additions & 0 deletions bench/application_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
import socket
import sys
import argparse


def start_server(port=8002, response_port=8001, router=None, destination=None):
serversocket = socket.socket()
serversocket.bind(('localhost', port))
serversocket.listen(1)

while True:
connection, address = serversocket.accept()
content = connection.recv(1024)
connection.send("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
connection.close()
send_response(content, response_port, router, destination)


def send_response(content, port, router=None, destination=None):
content = json.loads(content.splitlines()[-1])
reply = json.dumps({
"reply_to": content.get('message_id'),
"content": "reply message",
"channel_data": {
"session_event": "close",
},
})

s = socket.socket()
s.connect(('localhost', 8080))

if (router and destination):
s.send(
"POST /routers/%s/destinations/%s/messages/ HTTP/1.1\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n%s\r\n\r\n" % (router, destination, len(reply) + 2, reply))
else:
channel_id = content['channel_id']
s.send(
"POST /channels/%s/messages/ HTTP/1.1\r\n"
"Content-Type: application/json\r\n"
"Content-Length: %d\r\n"
"\r\n%s\r\n\r\n" % (channel_id, len(reply) + 2, reply))
reply = s.recv(1024)
try:
assert (reply.splitlines()[0] == 'HTTP/1.1 200 OK' or
reply.splitlines()[0] == 'HTTP/1.1 201 Created')
finally:
s.close()


def parse_arguments(args):
parser = argparse.ArgumentParser(
description=(
'Fake Application to handle junebug messages.'))
parser.add_argument(
'--router', dest='router', type=str, default=None,
help='Router that was created')
parser.add_argument(
'--destination', dest='destination', type=str, default=None,
help='Destination that was created')

return parser.parse_args(args)


def main():
config = parse_arguments(sys.argv[1:])
start_server(router=config.router, destination=config.destination)


if __name__ == '__main__':
main()
113 changes: 113 additions & 0 deletions bench/distribution.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions bench/pypy-cold-hacks.dat

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions bench/pypy-warm-hacks.dat

Large diffs are not rendered by default.

278 changes: 278 additions & 0 deletions bench/run_benchmarks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
from subprocess import Popen, PIPE
import httplib
import json
import os
import time
import re
import math
import sys
import argparse


class Process(object):
def get_command(self):
'''Subclasses implement. Returns a list representing start command.'''
return []

def start(self, stdout=PIPE, extra_commands=[]):
env = os.environ.copy()
env['JUNEBUG_DISABLE_LOGGING'] = 'true'
command = self.get_command()
command.extend(extra_commands)
self.process = Popen(
command, env=env, stdout=stdout)
self.post_start()

def get_rss(self):
if not sys.platform.startswith('linux'):
return 0
pid = self.process.pid
with open("/proc/%d/status" % pid) as f:
d = f.read()
start = d.find('RSS')
end = d.find('\n', start)
m = re.search("(\d+)\s+([kKmM])B", d[start:end])
if m.group(2) in 'kK':
coef = 0.001
else:
coef = 1.0
return int(math.ceil((float(m.group(1))) * coef))

def post_start(self):
'''Subclasses implement. What to do after starting process.'''
pass

def stop(self):
self.process.terminate()


class Junebug(Process):
def __init__(self, config):
self.config = config
self.conn = httplib.HTTPConnection('localhost', port=8080)

def get_command(self):
return ['jb']

def post_start(self):
# This is horrible
time.sleep(2)

def create_channel(self):
if self.config.channel_type == 'ussd':
self.conn.request(
"POST", '/channels/',
json.dumps({
'type': 'dmark',
'config': {
'web_path': 'api',
'web_port': 8001
},
'mo_url': 'http://localhost:8002',
}),
{'Content-Type': 'application/json'})
elif self.config.channel_type == 'router':
self.conn.request(
"POST", '/channels/',
json.dumps({
'type': 'dmark',
'config': {
'web_path': 'api',
'web_port': 8001
},
}),
{'Content-Type': 'application/json'})
elif self.config.channel_type == 'smpp':
self.conn.request(
"POST", '/channels/',
json.dumps({
"type": "smpp",
"mo_url": "http://localhost:8002",
"config": {
"system_id": "smppclient1",
"password": "password",
"twisted_endpoint": "tcp:localhost:2775"
}
}),
{'Content-Type': 'application/json'})
else:
raise RuntimeError(
'Invalid channel type %r' % self.config.channel_type)
r = self.conn.getresponse()
assert r.status == 201
channel = json.loads(r.read())['result']['id']
self._wait_for_start()
return channel

def create_routing(self, channel_id, extra_destinations):
self.conn.request(
"POST", '/routers/',
json.dumps({
"config": {
"channel": channel_id
},
"type": "from_address"
}),
{'Content-Type': 'application/json'})
r = self.conn.getresponse()
assert r.status == 201
router = json.loads(r.read())['result']['id']

self._wait_for_start("router")

for i in range(0, extra_destinations):
self.conn.request(
"POST", '/routers/%s/destinations/' % router,
json.dumps({
"mo_url": "http://localhost:8%02d3" % i,
"config": {
"regular_expression": "$^"
}
}),
{'Content-Type': 'application/json'})

r = self.conn.getresponse()

assert r.status == 201
destination = json.loads(r.read())['result']['id']
self._wait_for_start(
"extra destination {}".format(destination))

self.conn.request(
"POST", '/routers/%s/destinations/' % router,
json.dumps({
"mo_url": "http://localhost:8002",
"config": {
"regular_expression": "[^\n]+"
}
}),
{'Content-Type': 'application/json'})

r = self.conn.getresponse()
assert r.status == 201
destination = json.loads(r.read())['result']['id']

self._wait_for_start("destination")
return router, destination

def _wait_for_start(self, item='channel'):
# This is horrible
print 'Waiting for {} to start'.format(item)
time.sleep(2)

def delete_ussd_channel(self, channelid):
self.conn.request(
"DELETE", '/channels/%s' % channelid)
r = self.conn.getresponse()
assert r.status == 200

def delete_routing(self, router_id):
self.conn.request(
"DELETE", '/routers/%s' % router_id)
r = self.conn.getresponse()
assert r.status == 200


class FakeApplicationServer(Process):

def __init__(self, router=None, destination=None):
self.router = router
self.destination = destination

def get_command(self):
command = ['python', 'application_server.py']
if self.router and self.destination:
command.extend([
'--router', str(self.router),
'--destination', str(self.destination)])
return command


class BenchmarkRunner(Process):
def __init__(self, config):
self.config = config

def get_command(self):
if self.config.channel_type in ('ussd', 'router'):
command = ['python', 'submit_message.py']
elif self.config.channel_type == 'smpp':
command = ['python', 'submit_message_smpp.py']
command.extend([
'--end-id', str(self.config.test_length),
'--warmup', str(self.config.warmup)])
return command

def print_results(self):
for line in iter(self.process.stdout.readline, ''):
print line.rstrip('\n')


def parse_arguments(args):
parser = argparse.ArgumentParser(
description=(
'Runs the Junebug benchmarks and print out the results.'))
parser.add_argument(
'--channel-type', dest='channel_type', type=str, default='ussd',
help='The type of channel to benchmark. Either ussd or smpp')
parser.add_argument(
'--test-length', dest='test_length', type=int, default=10000,
help='The number of messages to send per benchmark.')
parser.add_argument(
'--warmup', dest='warmup', default=3000,
help='Number of iterations to discard for statistics')
parser.add_argument(
'--concurrency', dest='concurrency', type=int, default=[2, 5, 10, 20],
nargs='+', help='The list of concurrencies to test')
parser.add_argument(
'--extra-destinations', dest='extra_destinations', type=int, default=5,
help='Extra destinations to add to the router')
return parser.parse_args(args)


def main():
config = parse_arguments(sys.argv[1:])
try:
print 'Starting Junebug benchmark...'
jb = Junebug(config)
jb.start()

ch = jb.create_channel()

if config.channel_type == 'router':
rt, dest = jb.create_routing(ch, config.extra_destinations)
app = FakeApplicationServer(rt, dest)
else:
app = FakeApplicationServer()

app.start()

for concurrency in config.concurrency:
print 'Running benchmark with concurrency %d' % concurrency
benchmark = BenchmarkRunner(config)
max_rss = 0
benchmark.start(
stdout=None, extra_commands=[
'--concurrency=%d' % concurrency])
while benchmark.process.poll() is None:
try:
max_rss = max(max_rss, benchmark.get_rss())
except (OSError, IOError):
pass # possible race condition?
time.sleep(0.2)
if sys.platform.startswith('linux'):
print "Max memory: %d" % max_rss

jb.delete_ussd_channel(ch)

if config.channel_type == 'router':
jb.delete_routing(rt)
finally:
jb.stop()
app.stop()
try:
benchmark.stop()
except:
pass

if __name__ == '__main__':
main()
Loading