diff --git a/numismatic/cli.py b/numismatic/cli.py index d179967..32764f9 100644 --- a/numismatic/cli.py +++ b/numismatic/cli.py @@ -238,7 +238,6 @@ def collect(state, collector, filter, type, output, format, interval): collector = Collector.factory(collector_name, source_stream=output_stream, path=output, format=format, types=type, filters=filter, interval=interval) - # state['collectors'].append(collector) @coin.command() diff --git a/numismatic/collectors/__init__.py b/numismatic/collectors/__init__.py index 82ee4e8..c2c984c 100644 --- a/numismatic/collectors/__init__.py +++ b/numismatic/collectors/__init__.py @@ -1,9 +1,10 @@ from .base import Collector from .file import FileCollector +from .orderbook import OrderBookCollector -__all__ = ["Collector", "FileCollector"] +__all__ = ["Collector", "FileCollector", "OrderBookCollector"] from ..libs.utils import make_get_subclasses, subclass_factory diff --git a/numismatic/collectors/file.py b/numismatic/collectors/file.py index dfce2ef..477fa90 100644 --- a/numismatic/collectors/file.py +++ b/numismatic/collectors/file.py @@ -15,6 +15,7 @@ class FileCollector(Collector): interval = attr.ib(default=None) def __attrs_post_init__(self): + super().__attrs_post_init__() if self.path=='-': self._opener = lambda: sys.stdout elif self.path.endswith('.gz'): diff --git a/numismatic/collectors/orderbook.py b/numismatic/collectors/orderbook.py new file mode 100644 index 0000000..ae9740c --- /dev/null +++ b/numismatic/collectors/orderbook.py @@ -0,0 +1,60 @@ +import sys +import gzip +from functools import partial + +import attr + +from ..orderbooks import OrderBook +from .base import Collector + + +@attr.s +class OrderBookCollector(Collector): + + path = attr.ib(default='-') + format = attr.ib(default='text') + interval = attr.ib(default=None) + order_book = attr.ib(default=attr.Factory(OrderBook)) + + def __attrs_post_init__(self): + super().__attrs_post_init__() + if self.path=='-': + self._opener = lambda: sys.stdout + elif self.path.endswith('.gz'): + self._opener = partial(gzip.open, self.path, mode='at') + else: + self._opener = partial(open, self.path, mode='at') + + self.source_stream = (self.source_stream + .map(self.order_book.update) + .map(lambda ob: (ob.mid_price, + ob.best_bid, + ob.best_ask)) + ) + if self.format=='text': + self.source_stream = self.source_stream.map( + lambda ev: str(ev)+'\n') + elif self.format=='json': + self.source_stream = self.source_stream.map( + lambda ev: str(ev.json())+'\n') + else: + raise NotImplementedError(f'format={self.format!r}') + if self.interval: + self.source_stream = \ + self.source_stream.timed_window(interval=self.interval) + else: + # ensure downstream receives lists rather than elements + self.source_stream = \ + self.source_stream.partition(1) + self.source_stream.sink(self.write) + + def write(self, data): + # TODO: Use aiofiles for non-blocking IO here + file = self._opener() + try: + for datum in data: + file.write(datum) + file.flush() + finally: + if self.path!='-': + file.close() diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index e78427a..e1724cd 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -9,7 +9,8 @@ import websockets from .base import Exchange -from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder +from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder, \ + MarketDepthUpdate logger = logging.getLogger(__name__) @@ -89,25 +90,53 @@ def _handle_packet(self, packet, symbol): if 'time' in msg: dt = datetime.strptime(msg['time'], '%Y-%m-%dT%H:%M:%S.%fZ') timestamp = dt.timestamp() + else: + timestamp = time.time() if 'type' in msg and msg['type']=='heartbeat': - msg = Heartbeat(exchange=self.exchange, symbol=symbol, - timestamp=timestamp) - self.output_stream.emit(msg) + event = Heartbeat(exchange=self.exchange, symbol=symbol, + timestamp=timestamp) + self.output_stream.emit(event) elif 'type' in msg and msg['type']=='ticker' and 'trade_id' in msg: sign = -1 if ('side' in msg and msg['side']=='sell') else 1 price = msg['price'] volume = sign * msg['last_size'] if 'last_size' in msg else 0 trade_id = msg['trade_id'] - msg = Trade(exchange=self.exchange, symbol=symbol, - timestamp=timestamp, price=price, volume=volume, - id=trade_id) - self.output_stream.emit(msg) + event = Trade(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=price, volume=volume, + id=trade_id) + self.output_stream.emit(event) + elif 'type' in msg and msg['type']=='snapshot': + for price, volume in msg['bids']: + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=float(volume), id=price) + self.output_stream.emit(event) + for price, volume in msg['asks']: + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=-float(volume), id=price) + self.output_stream.emit(event) + elif 'type' in msg and msg['type']=='l2update': + for side, price, size in msg['changes']: + if size=='0': + volume = float(size) if side=='buy' else -float(size) + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=volume, id=price) + else: + # this is actually removing a level + # make up a fake volume to indicate bid or ask + volume = 1.0 if side=='buy' else -1.0 + event = CancelOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, id=price) + self.output_stream.emit(event) elif isinstance(msg, dict): - self.output_stream.emit(msg) + event = msg + self.output_stream.emit(event) else: raise NotImplementedError(msg) - return msg + return event if __name__=='__main__': @@ -119,7 +148,7 @@ def _handle_packet(self, packet, symbol): printer = output_stream.map(print) bfx = GDAXExchange(output_stream=output_stream) - bfx_btc = bfx.listen('BTC-USD', 'ticker,heartbeat') + bfx_btc = bfx.listen('BTC-USD', 'level2') loop = asyncio.get_event_loop() future = asyncio.wait([bfx_btc], timeout=15) diff --git a/numismatic/exchanges/luno.py b/numismatic/exchanges/luno.py index 08c432a..a1c4142 100644 --- a/numismatic/exchanges/luno.py +++ b/numismatic/exchanges/luno.py @@ -61,21 +61,19 @@ def _handle_order_book(self, packet, symbol): super()._handle_packet(packet, symbol) order_book = json.loads(packet) if 'asks' in order_book: - sign = -1 for order in order_book['asks']: id = order['id'] - volume = float(order['volume']) - price = sign * float(order['price']) + volume = -1 * float(order['volume']) + price = float(order['price']) order_ev = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=price, volume=volume, id=id) self.output_stream.emit(order_ev) if 'bids' in order_book: - sign = 1 for order in order_book['bids']: id = order['id'] volume = float(order['volume']) - price = sign * float(order['price']) + price = float(order['price']) order_ev = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=price, volume=volume, id=id) diff --git a/numismatic/libs/events.py b/numismatic/libs/events.py index bce52c3..b6c83c6 100644 --- a/numismatic/libs/events.py +++ b/numismatic/libs/events.py @@ -27,6 +27,7 @@ class Trade(Event): volume = attr.ib() id = attr.ib(default=None) + @attr.s(slots=True) class LimitOrder(Event): exchange = attr.ib() @@ -36,9 +37,19 @@ class LimitOrder(Event): volume = attr.ib() id = attr.ib() + @attr.s(slots=True) class CancelOrder(Event): exchange = attr.ib() symbol = attr.ib() timestamp = attr.ib() id = attr.ib() + + +@attr.s(slots=True) +class MarketDepthUpdate(Event): + exchange = attr.ib() + symbol = attr.ib() + timestamp = attr.ib() + price = attr.ib() + volume = attr.ib() diff --git a/numismatic/libs/queue.py b/numismatic/libs/queue.py new file mode 100644 index 0000000..3645471 --- /dev/null +++ b/numismatic/libs/queue.py @@ -0,0 +1,76 @@ +from heapq import heappush, heappop +from itertools import count + +class PriorityQueue: + # Adapted from: + # https://stackoverflow.com/questions/407734/a-generic-priority-queue-for-python + _REMOVED = object() # placeholder for a removed item + + def __init__(self): + self._heap = [] # heap of entries + self._entry_finder = {} # mapping of items to entries + self._counter = count() # unique sequence count + + def add(self, item, priority=0): + 'Add a new item or update the priority of an existing item' + if item in self._entry_finder: + self._remove_item(item) + count = next(self._counter) + entry = [priority, count, item] + self._entry_finder[item] = entry + heappush(self._heap, entry) + + def remove(self, item): + 'Mark an existing item as REMOVED. Raise LookupError if not found.' + entry = self._entry_finder.pop(item) + entry[-1] = self._REMOVED + + def pop(self): + 'Remove and return the lowest priority item. Raise LookupError if empty.' + if self: + priority, count, item = heappop(self._heap) + return item + else: + raise LookupError('pop from an empty priority queue') + + def peek(self): + return self._heap[0][2] if self else \ + LookupError('priority queue is empty') + + def copy(self): + pq = self.__class__() + pq._heap = self._heap[:] + pq._entry_finder = self._entry_finder.copy() + return pq + + def __bool__(self): + while self._heap and self._heap[0][2] is self._REMOVED: + heappop(self._heap) + return bool(self._heap) + + def __iter__(self): + return self + + def __next__(self): + try: + return self.pop() + except LookupError: + raise StopIteration + + + +if __name__=='__main__': + pq = PriorityQueue() + pq.add('b', 2) + pq.add('a', 1) + pq.add('c', 3) + pq.add('d', 4) + pq.add('e', 5) + for item in pq.copy(): + print(item, end=' ') + print() + pq.remove('b') + pq.remove('d') + print(pq.pop()) + print(pq.pop()) + print(pq.pop()) diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py new file mode 100644 index 0000000..354bace --- /dev/null +++ b/numismatic/orderbooks.py @@ -0,0 +1,72 @@ +import attr + +from .libs.events import LimitOrder, CancelOrder +from .libs.queue import PriorityQueue + + +@attr.s +class OrderBook: + + orders = attr.ib(default=attr.Factory(dict)) + bids = attr.ib(default=attr.Factory(PriorityQueue)) + asks = attr.ib(default=attr.Factory(PriorityQueue)) + + def update(self, order): + if isinstance(order, LimitOrder): + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + side.add(order.id, price) + self.orders[order.id] = order + elif isinstance(order, CancelOrder): + try: + order = self.orders[order.id] + except KeyError: + # Cancelling an order that doesn't exist + return self + side = self.bids if order.volume>0 else self.asks + side.remove(order.id) + del self.orders[order.id] + else: + raise NotImplementedError(type(order)) + return self + + @property + def best_bid(self): + return self.orders[self.bids.peek()].price if self.bids else \ + float('nan') + + @property + def best_ask(self): + return self.orders[self.asks.peek()].price if self.asks else \ + float('nan') + + @property + def mid_price(self): + return (self.best_bid+self.best_ask)/2 + + +if __name__=='__main__': + import time + import random + random.seed(5) + prices = [random.randint(-5, 5) for i in range(10)] + print(prices) + mid_price = 0 + ob = OrderBook() + for i, p in enumerate(prices): + # Treat orders below 5 as bids and above as asks + o = LimitOrder('test', 'BTCUSD', time.time(), p, + (1 if p