Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion numismatic/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def collect(state, collector, filter, type, output, format, interval):
collector = Collector.factory(collector_name, source_stream=output_stream,
path=output, format=format, types=type,
filters=filter, interval=interval)
# state['collectors'].append(collector)


@coin.command()
Expand Down
3 changes: 2 additions & 1 deletion numismatic/collectors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .base import Collector
from .file import FileCollector
from .orderbook import OrderBookCollector



__all__ = ["Collector", "FileCollector"]
__all__ = ["Collector", "FileCollector", "OrderBookCollector"]


from ..libs.utils import make_get_subclasses, subclass_factory
Expand Down
1 change: 1 addition & 0 deletions numismatic/collectors/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class FileCollector(Collector):
interval = attr.ib(default=None)

def __attrs_post_init__(self):
super().__attrs_post_init__()
if self.path=='-':
self._opener = lambda: sys.stdout
elif self.path.endswith('.gz'):
Expand Down
60 changes: 60 additions & 0 deletions numismatic/collectors/orderbook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import sys
import gzip
from functools import partial

import attr

from ..orderbooks import OrderBook
from .base import Collector


@attr.s
class OrderBookCollector(Collector):

path = attr.ib(default='-')
format = attr.ib(default='text')
interval = attr.ib(default=None)
order_book = attr.ib(default=attr.Factory(OrderBook))

def __attrs_post_init__(self):
super().__attrs_post_init__()
if self.path=='-':
self._opener = lambda: sys.stdout
elif self.path.endswith('.gz'):
self._opener = partial(gzip.open, self.path, mode='at')
else:
self._opener = partial(open, self.path, mode='at')

self.source_stream = (self.source_stream
.map(self.order_book.update)
.map(lambda ob: (ob.mid_price,
ob.best_bid,
ob.best_ask))
)
if self.format=='text':
self.source_stream = self.source_stream.map(
lambda ev: str(ev)+'\n')
elif self.format=='json':
self.source_stream = self.source_stream.map(
lambda ev: str(ev.json())+'\n')
else:
raise NotImplementedError(f'format={self.format!r}')
if self.interval:
self.source_stream = \
self.source_stream.timed_window(interval=self.interval)
else:
# ensure downstream receives lists rather than elements
self.source_stream = \
self.source_stream.partition(1)
self.source_stream.sink(self.write)

def write(self, data):
# TODO: Use aiofiles for non-blocking IO here
file = self._opener()
try:
for datum in data:
file.write(datum)
file.flush()
finally:
if self.path!='-':
file.close()
51 changes: 40 additions & 11 deletions numismatic/exchanges/gdax.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import websockets

from .base import Exchange
from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder
from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder, \
MarketDepthUpdate

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,25 +90,53 @@ def _handle_packet(self, packet, symbol):
if 'time' in msg:
dt = datetime.strptime(msg['time'], '%Y-%m-%dT%H:%M:%S.%fZ')
timestamp = dt.timestamp()
else:
timestamp = time.time()

if 'type' in msg and msg['type']=='heartbeat':
msg = Heartbeat(exchange=self.exchange, symbol=symbol,
timestamp=timestamp)
self.output_stream.emit(msg)
event = Heartbeat(exchange=self.exchange, symbol=symbol,
timestamp=timestamp)
self.output_stream.emit(event)
elif 'type' in msg and msg['type']=='ticker' and 'trade_id' in msg:
sign = -1 if ('side' in msg and msg['side']=='sell') else 1
price = msg['price']
volume = sign * msg['last_size'] if 'last_size' in msg else 0
trade_id = msg['trade_id']
msg = Trade(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=price, volume=volume,
id=trade_id)
self.output_stream.emit(msg)
event = Trade(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=price, volume=volume,
id=trade_id)
self.output_stream.emit(event)
elif 'type' in msg and msg['type']=='snapshot':
for price, volume in msg['bids']:
event = LimitOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=float(price),
volume=float(volume), id=price)
self.output_stream.emit(event)
for price, volume in msg['asks']:
event = LimitOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=float(price),
volume=-float(volume), id=price)
self.output_stream.emit(event)
elif 'type' in msg and msg['type']=='l2update':
for side, price, size in msg['changes']:
if size=='0':
volume = float(size) if side=='buy' else -float(size)
event = LimitOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=float(price),
volume=volume, id=price)
else:
# this is actually removing a level
# make up a fake volume to indicate bid or ask
volume = 1.0 if side=='buy' else -1.0
event = CancelOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, id=price)
self.output_stream.emit(event)
elif isinstance(msg, dict):
self.output_stream.emit(msg)
event = msg
self.output_stream.emit(event)
else:
raise NotImplementedError(msg)
return msg
return event


if __name__=='__main__':
Expand All @@ -119,7 +148,7 @@ def _handle_packet(self, packet, symbol):
printer = output_stream.map(print)

bfx = GDAXExchange(output_stream=output_stream)
bfx_btc = bfx.listen('BTC-USD', 'ticker,heartbeat')
bfx_btc = bfx.listen('BTC-USD', 'level2')

loop = asyncio.get_event_loop()
future = asyncio.wait([bfx_btc], timeout=15)
Expand Down
8 changes: 3 additions & 5 deletions numismatic/exchanges/luno.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,19 @@ def _handle_order_book(self, packet, symbol):
super()._handle_packet(packet, symbol)
order_book = json.loads(packet)
if 'asks' in order_book:
sign = -1
for order in order_book['asks']:
id = order['id']
volume = float(order['volume'])
price = sign * float(order['price'])
volume = -1 * float(order['volume'])
price = float(order['price'])
order_ev = LimitOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=price,
volume=volume, id=id)
self.output_stream.emit(order_ev)
if 'bids' in order_book:
sign = 1
for order in order_book['bids']:
id = order['id']
volume = float(order['volume'])
price = sign * float(order['price'])
price = float(order['price'])
order_ev = LimitOrder(exchange=self.exchange, symbol=symbol,
timestamp=timestamp, price=price,
volume=volume, id=id)
Expand Down
11 changes: 11 additions & 0 deletions numismatic/libs/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class Trade(Event):
volume = attr.ib()
id = attr.ib(default=None)


@attr.s(slots=True)
class LimitOrder(Event):
exchange = attr.ib()
Expand All @@ -36,9 +37,19 @@ class LimitOrder(Event):
volume = attr.ib()
id = attr.ib()


@attr.s(slots=True)
class CancelOrder(Event):
exchange = attr.ib()
symbol = attr.ib()
timestamp = attr.ib()
id = attr.ib()


@attr.s(slots=True)
class MarketDepthUpdate(Event):
exchange = attr.ib()
symbol = attr.ib()
timestamp = attr.ib()
price = attr.ib()
volume = attr.ib()
76 changes: 76 additions & 0 deletions numismatic/libs/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from heapq import heappush, heappop
from itertools import count

class PriorityQueue:
# Adapted from:
# https://stackoverflow.com/questions/407734/a-generic-priority-queue-for-python
_REMOVED = object() # placeholder for a removed item

def __init__(self):
self._heap = [] # heap of entries
self._entry_finder = {} # mapping of items to entries
self._counter = count() # unique sequence count

def add(self, item, priority=0):
'Add a new item or update the priority of an existing item'
if item in self._entry_finder:
self._remove_item(item)
count = next(self._counter)
entry = [priority, count, item]
self._entry_finder[item] = entry
heappush(self._heap, entry)

def remove(self, item):
'Mark an existing item as REMOVED. Raise LookupError if not found.'
entry = self._entry_finder.pop(item)
entry[-1] = self._REMOVED

def pop(self):
'Remove and return the lowest priority item. Raise LookupError if empty.'
if self:
priority, count, item = heappop(self._heap)
return item
else:
raise LookupError('pop from an empty priority queue')

def peek(self):
return self._heap[0][2] if self else \
LookupError('priority queue is empty')

def copy(self):
pq = self.__class__()
pq._heap = self._heap[:]
pq._entry_finder = self._entry_finder.copy()
return pq

def __bool__(self):
while self._heap and self._heap[0][2] is self._REMOVED:
heappop(self._heap)
return bool(self._heap)

def __iter__(self):
return self

def __next__(self):
try:
return self.pop()
except LookupError:
raise StopIteration



if __name__=='__main__':
pq = PriorityQueue()
pq.add('b', 2)
pq.add('a', 1)
pq.add('c', 3)
pq.add('d', 4)
pq.add('e', 5)
for item in pq.copy():
print(item, end=' ')
print()
pq.remove('b')
pq.remove('d')
print(pq.pop())
print(pq.pop())
print(pq.pop())
72 changes: 72 additions & 0 deletions numismatic/orderbooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import attr

from .libs.events import LimitOrder, CancelOrder
from .libs.queue import PriorityQueue


@attr.s
class OrderBook:

orders = attr.ib(default=attr.Factory(dict))
bids = attr.ib(default=attr.Factory(PriorityQueue))
asks = attr.ib(default=attr.Factory(PriorityQueue))

def update(self, order):
if isinstance(order, LimitOrder):
side = self.bids if order.volume>0 else self.asks
price = -order.price if order.volume>0 else order.price
side.add(order.id, price)
self.orders[order.id] = order
elif isinstance(order, CancelOrder):
try:
order = self.orders[order.id]
except KeyError:
# Cancelling an order that doesn't exist
return self
side = self.bids if order.volume>0 else self.asks
side.remove(order.id)
del self.orders[order.id]
else:
raise NotImplementedError(type(order))
return self

@property
def best_bid(self):
return self.orders[self.bids.peek()].price if self.bids else \
float('nan')

@property
def best_ask(self):
return self.orders[self.asks.peek()].price if self.asks else \
float('nan')

@property
def mid_price(self):
return (self.best_bid+self.best_ask)/2


if __name__=='__main__':
import time
import random
random.seed(5)
prices = [random.randint(-5, 5) for i in range(10)]
print(prices)
mid_price = 0
ob = OrderBook()
for i, p in enumerate(prices):
# Treat orders below 5 as bids and above as asks
o = LimitOrder('test', 'BTCUSD', time.time(), p,
(1 if p<mid_price else -1)*10*i , i)
print(o)
ob.update(o)
print(ob)
print(ob.best_bid(), ob.best_ask())
print()
for i, p in enumerate(prices):
o = CancelOrder('test', 'BTCUSD', time.time(), i)
print(o)
ob.update(o)
print(ob)
print(ob.best_bid(), ob.best_ask())
print()
print(prices)