From c5bf37801410c743976ebb81f6b9eb4114dccc0f Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Wed, 18 Oct 2017 20:43:52 +0000 Subject: [PATCH 01/18] Adds TODO and FIXME notes --- numismatic/collectors/base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/numismatic/collectors/base.py b/numismatic/collectors/base.py index f51dc63..73e49b6 100644 --- a/numismatic/collectors/base.py +++ b/numismatic/collectors/base.py @@ -8,6 +8,8 @@ class Collector: types = attr.ib(default=attr.Factory(list)) filters = attr.ib(default=attr.Factory(list)) + # FIXME: Add a factory method + def __attrs_post_init__(self): if self.types: self.source_stream = self.source_stream.filter( From c742a7b54e3cde7ab2ca69ab8a7b0738e756fa7d Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Thu, 19 Oct 2017 05:02:31 +0000 Subject: [PATCH 02/18] Adds a basic OrderBook implementation --- numismatic/orderbooks.py | 60 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 numismatic/orderbooks.py diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py new file mode 100644 index 0000000..0c19290 --- /dev/null +++ b/numismatic/orderbooks.py @@ -0,0 +1,60 @@ +from functools import partial +from itertools import chain +from collections import defaultdict + +import attr + +from .libs.events import LimitOrder, CancelOrder + + +@attr.s +class OrderBook: + + orders = attr.ib(default=attr.Factory(dict)) + levels = attr.ib(default=attr.Factory(partial(defaultdict, list))) + bids = attr.ib(default=attr.Factory(list)) + asks = attr.ib(default=attr.Factory(list)) + + def update(self, order): + if isinstance(order, LimitOrder): + self.orders[order.id] = order + level = self.levels[order.price] + level.append(order.id) + side = self.bids if order.volume>0 else self.asks + side.append(order.price) + elif isinstance(order, CancelOrder): + order = self.orders[order.id] + del self.orders[order.id] + level = self.levels[order.price] + level.remove(order.id) + if not level: + del self.levels[order.price] + side = self.bids if order.volume>0 else self.asks + side.remove(order.price) + else: + raise NotImplementedError(f'order={order}') + + def best_bid(self): + return max(self.bids) if self.bids else float('nan') + + + def best_ask(self): + return min(self.asks) if self.asks else float('nan') + + +if __name__=='__main__': + import time + prices = [1, 10, 2, 2, 9, 9] + ob = OrderBook() + for i, p in enumerate(prices): + # Treat orders below 5 as bids and above as asks + o = LimitOrder('test', 'BTCUSD', time.time(), p, + (1 if p<5 else -1)*10*p , i) + ob.update(o) + print('\n', ob.best_bid(), ob.best_ask()) + print(ob) + for i, p in enumerate(prices): + c = CancelOrder('test', 'BTCUSD', time.time(), i) + ob.update(c) + print('\n', ob.best_bid(), ob.best_ask()) + print(ob) From 82de1245597cfed6448ae4d82e371b4f2349ce0c Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Thu, 19 Oct 2017 05:10:46 +0000 Subject: [PATCH 03/18] Streamlines the OrderBook code --- numismatic/orderbooks.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 0c19290..632af45 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -11,26 +11,23 @@ class OrderBook: orders = attr.ib(default=attr.Factory(dict)) - levels = attr.ib(default=attr.Factory(partial(defaultdict, list))) - bids = attr.ib(default=attr.Factory(list)) - asks = attr.ib(default=attr.Factory(list)) + bids = attr.ib(default=attr.Factory(partial(defaultdict, list))) + asks = attr.ib(default=attr.Factory(partial(defaultdict, list))) def update(self, order): if isinstance(order, LimitOrder): self.orders[order.id] = order - level = self.levels[order.price] + levels = self.bids if order.volume>0 else self.asks + level = levels[order.price] level.append(order.id) - side = self.bids if order.volume>0 else self.asks - side.append(order.price) elif isinstance(order, CancelOrder): order = self.orders[order.id] del self.orders[order.id] - level = self.levels[order.price] + levels = self.bids if order.volume>0 else self.asks + level = levels[order.price] level.remove(order.id) if not level: - del self.levels[order.price] - side = self.bids if order.volume>0 else self.asks - side.remove(order.price) + del levels[order.price] else: raise NotImplementedError(f'order={order}') From 6a3cbcfaa8d4459a8f2dbc571f5a9e1a2fe3ec78 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Thu, 19 Oct 2017 06:41:17 +0000 Subject: [PATCH 04/18] Improves efficiency of order book by maintaining sorted list of bids and asks --- numismatic/orderbooks.py | 49 ++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 632af45..2cef59b 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -1,6 +1,7 @@ from functools import partial from itertools import chain from collections import defaultdict +import bisect import attr @@ -11,47 +12,65 @@ class OrderBook: orders = attr.ib(default=attr.Factory(dict)) - bids = attr.ib(default=attr.Factory(partial(defaultdict, list))) - asks = attr.ib(default=attr.Factory(partial(defaultdict, list))) + levels = attr.ib(default=attr.Factory(partial(defaultdict, list))) + bids = attr.ib(default=attr.Factory(list)) + asks = attr.ib(default=attr.Factory(list)) def update(self, order): if isinstance(order, LimitOrder): self.orders[order.id] = order - levels = self.bids if order.volume>0 else self.asks - level = levels[order.price] + level = self.levels[order.price] + if not level: + # a new level so need to update bids or asks + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + position = bisect.bisect(side, price) + side.insert(position, price) level.append(order.id) elif isinstance(order, CancelOrder): order = self.orders[order.id] del self.orders[order.id] - levels = self.bids if order.volume>0 else self.asks - level = levels[order.price] + level = self.levels[order.price] level.remove(order.id) if not level: - del levels[order.price] + del self.levels[order.price] + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + position = bisect.bisect(side, price) + side.pop(position-1) else: raise NotImplementedError(f'order={order}') def best_bid(self): - return max(self.bids) if self.bids else float('nan') + return -self.bids[0] if self.bids else float('nan') def best_ask(self): - return min(self.asks) if self.asks else float('nan') + return self.asks[0] if self.asks else float('nan') if __name__=='__main__': import time - prices = [1, 10, 2, 2, 9, 9] + import random + random.seed(5) + prices = [random.randint(-5, 5) for i in range(10)] + print(prices) + mid_price = 0 ob = OrderBook() for i, p in enumerate(prices): # Treat orders below 5 as bids and above as asks o = LimitOrder('test', 'BTCUSD', time.time(), p, - (1 if p<5 else -1)*10*p , i) + (1 if p Date: Thu, 19 Oct 2017 21:45:22 +0000 Subject: [PATCH 05/18] Adds an experimental OrderBookCollector --- numismatic/cli.py | 1 - numismatic/collectors/__init__.py | 3 ++- numismatic/collectors/base.py | 2 -- numismatic/collectors/orderbook.py | 17 +++++++++++++++++ numismatic/orderbooks.py | 6 +++++- 5 files changed, 24 insertions(+), 5 deletions(-) create mode 100644 numismatic/collectors/orderbook.py diff --git a/numismatic/cli.py b/numismatic/cli.py index d179967..32764f9 100644 --- a/numismatic/cli.py +++ b/numismatic/cli.py @@ -238,7 +238,6 @@ def collect(state, collector, filter, type, output, format, interval): collector = Collector.factory(collector_name, source_stream=output_stream, path=output, format=format, types=type, filters=filter, interval=interval) - # state['collectors'].append(collector) @coin.command() diff --git a/numismatic/collectors/__init__.py b/numismatic/collectors/__init__.py index 82ee4e8..c2c984c 100644 --- a/numismatic/collectors/__init__.py +++ b/numismatic/collectors/__init__.py @@ -1,9 +1,10 @@ from .base import Collector from .file import FileCollector +from .orderbook import OrderBookCollector -__all__ = ["Collector", "FileCollector"] +__all__ = ["Collector", "FileCollector", "OrderBookCollector"] from ..libs.utils import make_get_subclasses, subclass_factory diff --git a/numismatic/collectors/base.py b/numismatic/collectors/base.py index 73e49b6..f51dc63 100644 --- a/numismatic/collectors/base.py +++ b/numismatic/collectors/base.py @@ -8,8 +8,6 @@ class Collector: types = attr.ib(default=attr.Factory(list)) filters = attr.ib(default=attr.Factory(list)) - # FIXME: Add a factory method - def __attrs_post_init__(self): if self.types: self.source_stream = self.source_stream.filter( diff --git a/numismatic/collectors/orderbook.py b/numismatic/collectors/orderbook.py new file mode 100644 index 0000000..82be106 --- /dev/null +++ b/numismatic/collectors/orderbook.py @@ -0,0 +1,17 @@ +import attr + +from ..orderbooks import OrderBook + + +@attr.s +class OrderBookCollector: + + order_book = attr.ib(default=attr.Factory(OrderBook)) + + def __attrs_post_init__(self): + self.sourc_stream = (self.source_stream + .map(self.order_book.update) + .map(lambda ob: print(ob.mid_price(), + ob.best_bid(), + ob.best_ask())) + ) diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 2cef59b..8212a9b 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -39,7 +39,8 @@ def update(self, order): position = bisect.bisect(side, price) side.pop(position-1) else: - raise NotImplementedError(f'order={order}') + raise NotImplementedError(type(order)) + return self def best_bid(self): return -self.bids[0] if self.bids else float('nan') @@ -48,6 +49,9 @@ def best_bid(self): def best_ask(self): return self.asks[0] if self.asks else float('nan') + def mid_price(self): + return (self.best_bid()+self.best_ask())/2 + if __name__=='__main__': import time From 0dc7a1f83b77b66623bbd6c276aacc1800904488 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Thu, 19 Oct 2017 22:29:13 +0000 Subject: [PATCH 06/18] Adds a very experimental OrderBookCollector --- numismatic/collectors/orderbook.py | 56 ++++++++++++++++++++++++++---- numismatic/orderbooks.py | 1 - 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/numismatic/collectors/orderbook.py b/numismatic/collectors/orderbook.py index 82be106..3ad4639 100644 --- a/numismatic/collectors/orderbook.py +++ b/numismatic/collectors/orderbook.py @@ -1,17 +1,59 @@ +import sys +import gzip +from functools import partial + import attr from ..orderbooks import OrderBook +from .base import Collector @attr.s -class OrderBookCollector: +class OrderBookCollector(Collector): + path = attr.ib(default='-') + format = attr.ib(default='text') + interval = attr.ib(default=None) order_book = attr.ib(default=attr.Factory(OrderBook)) def __attrs_post_init__(self): - self.sourc_stream = (self.source_stream - .map(self.order_book.update) - .map(lambda ob: print(ob.mid_price(), - ob.best_bid(), - ob.best_ask())) - ) + if self.path=='-': + self._opener = lambda: sys.stdout + elif self.path.endswith('.gz'): + self._opener = partial(gzip.open, self.path, mode='at') + else: + self._opener = partial(open, self.path, mode='at') + + self.source_stream = (self.source_stream + .map(self.order_book.update) + .map(lambda ob: (ob.mid_price(), + ob.best_bid(), + ob.best_ask())) + ) + if self.format=='text': + self.source_stream = self.source_stream.map( + lambda ev: str(ev)+'\n') + elif self.format=='json': + self.source_stream = self.source_stream.map( + lambda ev: str(ev.json())+'\n') + else: + raise NotImplementedError(f'format={self.format!r}') + if self.interval: + self.source_stream = \ + self.source_stream.timed_window(interval=self.interval) + else: + # ensure downstream receives lists rather than elements + self.source_stream = \ + self.source_stream.partition(1) + self.source_stream.sink(self.write) + + def write(self, data): + # TODO: Use aiofiles for non-blocking IO here + file = self._opener() + try: + for datum in data: + file.write(datum) + file.flush() + finally: + if self.path!='-': + file.close() diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 8212a9b..fb23766 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -45,7 +45,6 @@ def update(self, order): def best_bid(self): return -self.bids[0] if self.bids else float('nan') - def best_ask(self): return self.asks[0] if self.asks else float('nan') From e0939d492d04360af8a7e7dba27acbd1acfe971f Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Thu, 19 Oct 2017 22:40:14 +0000 Subject: [PATCH 07/18] Fixes volume for offer orders to be negative --- numismatic/exchanges/luno.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/numismatic/exchanges/luno.py b/numismatic/exchanges/luno.py index 08c432a..a1c4142 100644 --- a/numismatic/exchanges/luno.py +++ b/numismatic/exchanges/luno.py @@ -61,21 +61,19 @@ def _handle_order_book(self, packet, symbol): super()._handle_packet(packet, symbol) order_book = json.loads(packet) if 'asks' in order_book: - sign = -1 for order in order_book['asks']: id = order['id'] - volume = float(order['volume']) - price = sign * float(order['price']) + volume = -1 * float(order['volume']) + price = float(order['price']) order_ev = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=price, volume=volume, id=id) self.output_stream.emit(order_ev) if 'bids' in order_book: - sign = 1 for order in order_book['bids']: id = order['id'] volume = float(order['volume']) - price = sign * float(order['price']) + price = float(order['price']) order_ev = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=price, volume=volume, id=id) From 080eac480e120229071076d25e545a0650bfc8fc Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Fri, 20 Oct 2017 04:45:04 +0000 Subject: [PATCH 08/18] Fixes missing call to base class initialization --- numismatic/collectors/file.py | 1 + numismatic/collectors/orderbook.py | 1 + 2 files changed, 2 insertions(+) diff --git a/numismatic/collectors/file.py b/numismatic/collectors/file.py index dfce2ef..477fa90 100644 --- a/numismatic/collectors/file.py +++ b/numismatic/collectors/file.py @@ -15,6 +15,7 @@ class FileCollector(Collector): interval = attr.ib(default=None) def __attrs_post_init__(self): + super().__attrs_post_init__() if self.path=='-': self._opener = lambda: sys.stdout elif self.path.endswith('.gz'): diff --git a/numismatic/collectors/orderbook.py b/numismatic/collectors/orderbook.py index 3ad4639..a743181 100644 --- a/numismatic/collectors/orderbook.py +++ b/numismatic/collectors/orderbook.py @@ -17,6 +17,7 @@ class OrderBookCollector(Collector): order_book = attr.ib(default=attr.Factory(OrderBook)) def __attrs_post_init__(self): + super().__attrs_post_init__() if self.path=='-': self._opener = lambda: sys.stdout elif self.path.endswith('.gz'): From bf2f97a2cffe739330ceaad69f51f3134515c903 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sat, 21 Oct 2017 17:08:42 +0000 Subject: [PATCH 09/18] Adds basic PriorityQueue implementation --- numismatic/libs/queue.py | 44 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 numismatic/libs/queue.py diff --git a/numismatic/libs/queue.py b/numismatic/libs/queue.py new file mode 100644 index 0000000..863323a --- /dev/null +++ b/numismatic/libs/queue.py @@ -0,0 +1,44 @@ +from heapq import heappush, heappop +from itertools import count + +class PriorityQueue: + _REMOVED = object() # placeholder for a removed item + + def __init__(self): + self._pq = [] # heap of entries + self._entry_finder = {} # mapping of items to entries + self._counter = count() # unique sequence count + + def add(self, item, priority=0): + 'Add a new item or update the priority of an existing item' + if item in self._entry_finder: + self._remove_item(item) + count = next(self._counter) + entry = [priority, count, item] + self._entry_finder[item] = entry + heappush(self._pq, entry) + + def remove(self, item): + 'Mark an existing item as REMOVED. Raise KeyError if not found.' + entry = self._entry_finder.pop(item) + entry[-1] = self._REMOVED + + def pop(self): + 'Remove and return the lowest priority item. Raise KeyError if empty.' + while self._pq: + priority, count, item = heappop(self._pq) + if item is not self._REMOVED: + del self._entry_finder[item] + return item + raise KeyError('pop from an empty priority queue') + + + +if __name__=='__main__': + pq = PriorityQueue() + pq.add('b', 2) + pq.add('a', 1) + pq.add('c', 3) + print(pq.pop()) + print(pq.pop()) + print(pq.pop()) From b339754508b49c6bc18f4fb52fc0dea1d93d519a Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sat, 21 Oct 2017 17:47:00 +0000 Subject: [PATCH 10/18] Adds PriorityQueue.peek() --- numismatic/libs/queue.py | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/numismatic/libs/queue.py b/numismatic/libs/queue.py index 863323a..2e31683 100644 --- a/numismatic/libs/queue.py +++ b/numismatic/libs/queue.py @@ -2,10 +2,12 @@ from itertools import count class PriorityQueue: + # Adapted from: + # https://stackoverflow.com/questions/407734/a-generic-priority-queue-for-python _REMOVED = object() # placeholder for a removed item def __init__(self): - self._pq = [] # heap of entries + self._heap = [] # heap of entries self._entry_finder = {} # mapping of items to entries self._counter = count() # unique sequence count @@ -16,7 +18,7 @@ def add(self, item, priority=0): count = next(self._counter) entry = [priority, count, item] self._entry_finder[item] = entry - heappush(self._pq, entry) + heappush(self._heap, entry) def remove(self, item): 'Mark an existing item as REMOVED. Raise KeyError if not found.' @@ -25,13 +27,20 @@ def remove(self, item): def pop(self): 'Remove and return the lowest priority item. Raise KeyError if empty.' - while self._pq: - priority, count, item = heappop(self._pq) - if item is not self._REMOVED: - del self._entry_finder[item] - return item - raise KeyError('pop from an empty priority queue') + if not self.empty(): + priority, count, item = heappop(self._heap) + return item + else: + raise KeyError('pop from an empty priority queue') + def peek(self): + return self._heap[0][2] if not self.empty() else \ + KeyError('priority queue is empty') + + def empty(self): + while self._heap and self._heap[0][2] is self._REMOVED: + heappop(self._heap) + return not self._heap if __name__=='__main__': @@ -39,6 +48,10 @@ def pop(self): pq.add('b', 2) pq.add('a', 1) pq.add('c', 3) + pq.add('d', 4) + pq.add('e', 5) + pq.remove('b') + pq.remove('d') print(pq.pop()) print(pq.pop()) print(pq.pop()) From b4b361f05531e6e0d1bc6741530d026b98626d60 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 13:10:37 +0000 Subject: [PATCH 11/18] Implements __iter__ interface in PriorityQueue --- numismatic/libs/queue.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/numismatic/libs/queue.py b/numismatic/libs/queue.py index 2e31683..9de284c 100644 --- a/numismatic/libs/queue.py +++ b/numismatic/libs/queue.py @@ -42,6 +42,22 @@ def empty(self): heappop(self._heap) return not self._heap + def copy(self): + pq = self.__class__() + pq._heap = self._heap[:] + pq._entry_finder = self._entry_finder.copy() + return pq + + def __iter__(self): + return self + + def __next__(self): + try: + return self.pop() + except KeyError: + raise StopIteration + + if __name__=='__main__': pq = PriorityQueue() @@ -50,6 +66,9 @@ def empty(self): pq.add('c', 3) pq.add('d', 4) pq.add('e', 5) + for item in pq.copy(): + print(item, end=' ') + print() pq.remove('b') pq.remove('d') print(pq.pop()) From 4f54f515330cf0a705605111181c4a7c314d32ea Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 13:34:23 +0000 Subject: [PATCH 12/18] Improves OrderBook to use more efficient PriorityQueue --- numismatic/libs/queue.py | 24 ++++++++++++------------ numismatic/orderbooks.py | 39 +++++++++++++-------------------------- 2 files changed, 25 insertions(+), 38 deletions(-) diff --git a/numismatic/libs/queue.py b/numismatic/libs/queue.py index 9de284c..3645471 100644 --- a/numismatic/libs/queue.py +++ b/numismatic/libs/queue.py @@ -21,26 +21,21 @@ def add(self, item, priority=0): heappush(self._heap, entry) def remove(self, item): - 'Mark an existing item as REMOVED. Raise KeyError if not found.' + 'Mark an existing item as REMOVED. Raise LookupError if not found.' entry = self._entry_finder.pop(item) entry[-1] = self._REMOVED def pop(self): - 'Remove and return the lowest priority item. Raise KeyError if empty.' - if not self.empty(): + 'Remove and return the lowest priority item. Raise LookupError if empty.' + if self: priority, count, item = heappop(self._heap) return item else: - raise KeyError('pop from an empty priority queue') + raise LookupError('pop from an empty priority queue') def peek(self): - return self._heap[0][2] if not self.empty() else \ - KeyError('priority queue is empty') - - def empty(self): - while self._heap and self._heap[0][2] is self._REMOVED: - heappop(self._heap) - return not self._heap + return self._heap[0][2] if self else \ + LookupError('priority queue is empty') def copy(self): pq = self.__class__() @@ -48,13 +43,18 @@ def copy(self): pq._entry_finder = self._entry_finder.copy() return pq + def __bool__(self): + while self._heap and self._heap[0][2] is self._REMOVED: + heappop(self._heap) + return bool(self._heap) + def __iter__(self): return self def __next__(self): try: return self.pop() - except KeyError: + except LookupError: raise StopIteration diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index fb23766..e746482 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -1,52 +1,39 @@ -from functools import partial -from itertools import chain -from collections import defaultdict -import bisect - import attr from .libs.events import LimitOrder, CancelOrder +from .libs.queue import PriorityQueue @attr.s class OrderBook: orders = attr.ib(default=attr.Factory(dict)) - levels = attr.ib(default=attr.Factory(partial(defaultdict, list))) - bids = attr.ib(default=attr.Factory(list)) - asks = attr.ib(default=attr.Factory(list)) + bids = attr.ib(default=attr.Factory(PriorityQueue)) + asks = attr.ib(default=attr.Factory(PriorityQueue)) def update(self, order): if isinstance(order, LimitOrder): self.orders[order.id] = order - level = self.levels[order.price] - if not level: - # a new level so need to update bids or asks - side = self.bids if order.volume>0 else self.asks - price = -order.price if order.volume>0 else order.price - position = bisect.bisect(side, price) - side.insert(position, price) - level.append(order.id) + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + side.add(order.id, price) elif isinstance(order, CancelOrder): order = self.orders[order.id] del self.orders[order.id] - level = self.levels[order.price] - level.remove(order.id) - if not level: - del self.levels[order.price] - side = self.bids if order.volume>0 else self.asks - price = -order.price if order.volume>0 else order.price - position = bisect.bisect(side, price) - side.pop(position-1) + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + side.remove(order.id) else: raise NotImplementedError(type(order)) return self def best_bid(self): - return -self.bids[0] if self.bids else float('nan') + return self.orders[self.bids.peek()].price if self.bids else \ + float('nan') def best_ask(self): - return self.asks[0] if self.asks else float('nan') + return self.orders[self.asks.peek()].price if self.asks else \ + float('nan') def mid_price(self): return (self.best_bid()+self.best_ask())/2 From ce87cb788a1c8e8f1e6697a0bc9566c0805ff380 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 17:55:49 +0000 Subject: [PATCH 13/18] WIP: Attempts GDAX Market Depth tracking with MarketDepthUpdate --- numismatic/exchanges/gdax.py | 47 +++++++++++++++++++++++++++--------- numismatic/libs/events.py | 11 +++++++++ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index e78427a..caf9e68 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -9,7 +9,8 @@ import websockets from .base import Exchange -from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder +from ..libs.events import Heartbeat, Trade, LimitOrder, CancelOrder, \ + MarketDepthUpdate logger = logging.getLogger(__name__) @@ -89,25 +90,49 @@ def _handle_packet(self, packet, symbol): if 'time' in msg: dt = datetime.strptime(msg['time'], '%Y-%m-%dT%H:%M:%S.%fZ') timestamp = dt.timestamp() + else: + timestamp = time.time() if 'type' in msg and msg['type']=='heartbeat': - msg = Heartbeat(exchange=self.exchange, symbol=symbol, - timestamp=timestamp) - self.output_stream.emit(msg) + event = Heartbeat(exchange=self.exchange, symbol=symbol, + timestamp=timestamp) + self.output_stream.emit(event) elif 'type' in msg and msg['type']=='ticker' and 'trade_id' in msg: sign = -1 if ('side' in msg and msg['side']=='sell') else 1 price = msg['price'] volume = sign * msg['last_size'] if 'last_size' in msg else 0 trade_id = msg['trade_id'] - msg = Trade(exchange=self.exchange, symbol=symbol, - timestamp=timestamp, price=price, volume=volume, - id=trade_id) - self.output_stream.emit(msg) + event = Trade(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=price, volume=volume, + id=trade_id) + self.output_stream.emit(event) + elif 'type' in msg and msg['type']=='snapshot': + for price, volume in msg['bids']: + event = MarketDepthUpdate(exchange=self.exchange, + symbol=symbol, timestamp=timestamp, + price=float(price), + volume=float(volume)) + self.output_stream.emit(event) + for price, volume in msg['asks']: + event = MarketDepthUpdate(exchange=self.exchange, + symbol=symbol, timestamp=timestamp, + price=float(price), + volume=-float(volume)) + self.output_stream.emit(event) + elif 'type' in msg and msg['type']=='l2update': + for side, price, size in msg['changes']: + volume = float(size) if side=='buy' else -float(size) + event = MarketDepthUpdate(exchange=self.exchange, + symbol=symbol, timestamp=timestamp, + price=float(price), + volume=volume) + self.output_stream.emit(event) elif isinstance(msg, dict): - self.output_stream.emit(msg) + event = msg + self.output_stream.emit(event) else: raise NotImplementedError(msg) - return msg + return event if __name__=='__main__': @@ -119,7 +144,7 @@ def _handle_packet(self, packet, symbol): printer = output_stream.map(print) bfx = GDAXExchange(output_stream=output_stream) - bfx_btc = bfx.listen('BTC-USD', 'ticker,heartbeat') + bfx_btc = bfx.listen('BTC-USD', 'level2') loop = asyncio.get_event_loop() future = asyncio.wait([bfx_btc], timeout=15) diff --git a/numismatic/libs/events.py b/numismatic/libs/events.py index bce52c3..b6c83c6 100644 --- a/numismatic/libs/events.py +++ b/numismatic/libs/events.py @@ -27,6 +27,7 @@ class Trade(Event): volume = attr.ib() id = attr.ib(default=None) + @attr.s(slots=True) class LimitOrder(Event): exchange = attr.ib() @@ -36,9 +37,19 @@ class LimitOrder(Event): volume = attr.ib() id = attr.ib() + @attr.s(slots=True) class CancelOrder(Event): exchange = attr.ib() symbol = attr.ib() timestamp = attr.ib() id = attr.ib() + + +@attr.s(slots=True) +class MarketDepthUpdate(Event): + exchange = attr.ib() + symbol = attr.ib() + timestamp = attr.ib() + price = attr.ib() + volume = attr.ib() From fb2f75553514e32b8ae204dfd89633153e3cf9a2 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 17:58:59 +0000 Subject: [PATCH 14/18] WIP: Attempts Market Depth tracking with overloading LimitOrder events --- numismatic/exchanges/gdax.py | 21 +++++++++------------ numismatic/orderbooks.py | 14 ++++++++++---- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index caf9e68..96fafec 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -108,24 +108,21 @@ def _handle_packet(self, packet, symbol): self.output_stream.emit(event) elif 'type' in msg and msg['type']=='snapshot': for price, volume in msg['bids']: - event = MarketDepthUpdate(exchange=self.exchange, - symbol=symbol, timestamp=timestamp, - price=float(price), - volume=float(volume)) + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=float(volume), id=price) self.output_stream.emit(event) for price, volume in msg['asks']: - event = MarketDepthUpdate(exchange=self.exchange, - symbol=symbol, timestamp=timestamp, - price=float(price), - volume=-float(volume)) + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=-float(volume), id=price) self.output_stream.emit(event) elif 'type' in msg and msg['type']=='l2update': for side, price, size in msg['changes']: volume = float(size) if side=='buy' else -float(size) - event = MarketDepthUpdate(exchange=self.exchange, - symbol=symbol, timestamp=timestamp, - price=float(price), - volume=volume) + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=volume, id=price) self.output_stream.emit(event) elif isinstance(msg, dict): event = msg diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index e746482..1ef71ae 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -12,16 +12,22 @@ class OrderBook: asks = attr.ib(default=attr.Factory(PriorityQueue)) def update(self, order): - if isinstance(order, LimitOrder): - self.orders[order.id] = order + if isinstance(order, LimitOrder) and abs(order.volume>0): side = self.bids if order.volume>0 else self.asks price = -order.price if order.volume>0 else order.price side.add(order.id, price) + self.orders[order.id] = order elif isinstance(order, CancelOrder): order = self.orders[order.id] - del self.orders[order.id] side = self.bids if order.volume>0 else self.asks - price = -order.price if order.volume>0 else order.price + side.remove(order.id) + del self.orders[order.id] + elif isinstance(order, LimitOrder) and order.volume==0: + # When dealing with aggregated Level 2 data, a zero volume order + # indicates that that level can be removed. + order = self.orders[order.id] + del self.orders[order.id] + side = self.bids if order.price<=self.best_bid else self.asks side.remove(order.id) else: raise NotImplementedError(type(order)) From 0c21710708b4e429a4c955f6575d480b322919ca Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 17:58:28 +0000 Subject: [PATCH 15/18] Makes best_bid, best_ask and mid_price properties --- numismatic/collectors/orderbook.py | 6 +++--- numismatic/orderbooks.py | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/numismatic/collectors/orderbook.py b/numismatic/collectors/orderbook.py index a743181..ae9740c 100644 --- a/numismatic/collectors/orderbook.py +++ b/numismatic/collectors/orderbook.py @@ -27,9 +27,9 @@ def __attrs_post_init__(self): self.source_stream = (self.source_stream .map(self.order_book.update) - .map(lambda ob: (ob.mid_price(), - ob.best_bid(), - ob.best_ask())) + .map(lambda ob: (ob.mid_price, + ob.best_bid, + ob.best_ask)) ) if self.format=='text': self.source_stream = self.source_stream.map( diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 1ef71ae..41ad1fc 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -33,16 +33,19 @@ def update(self, order): raise NotImplementedError(type(order)) return self + @property def best_bid(self): return self.orders[self.bids.peek()].price if self.bids else \ float('nan') + @property def best_ask(self): return self.orders[self.asks.peek()].price if self.asks else \ float('nan') + @property def mid_price(self): - return (self.best_bid()+self.best_ask())/2 + return (self.best_bid+self.best_ask)/2 if __name__=='__main__': From 78e0d44f6a27e80cf74e9122793e0b5faefe1650 Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 18:09:28 +0000 Subject: [PATCH 16/18] WIP: Tries to fix GDAX Market Depth --- numismatic/exchanges/gdax.py | 15 +++++++++++---- numismatic/orderbooks.py | 9 +-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index 96fafec..e1724cd 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -119,10 +119,17 @@ def _handle_packet(self, packet, symbol): self.output_stream.emit(event) elif 'type' in msg and msg['type']=='l2update': for side, price, size in msg['changes']: - volume = float(size) if side=='buy' else -float(size) - event = LimitOrder(exchange=self.exchange, symbol=symbol, - timestamp=timestamp, price=float(price), - volume=volume, id=price) + if size=='0': + volume = float(size) if side=='buy' else -float(size) + event = LimitOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, price=float(price), + volume=volume, id=price) + else: + # this is actually removing a level + # make up a fake volume to indicate bid or ask + volume = 1.0 if side=='buy' else -1.0 + event = CancelOrder(exchange=self.exchange, symbol=symbol, + timestamp=timestamp, id=price) self.output_stream.emit(event) elif isinstance(msg, dict): event = msg diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 41ad1fc..e0f729f 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -12,7 +12,7 @@ class OrderBook: asks = attr.ib(default=attr.Factory(PriorityQueue)) def update(self, order): - if isinstance(order, LimitOrder) and abs(order.volume>0): + if isinstance(order, LimitOrder): side = self.bids if order.volume>0 else self.asks price = -order.price if order.volume>0 else order.price side.add(order.id, price) @@ -22,13 +22,6 @@ def update(self, order): side = self.bids if order.volume>0 else self.asks side.remove(order.id) del self.orders[order.id] - elif isinstance(order, LimitOrder) and order.volume==0: - # When dealing with aggregated Level 2 data, a zero volume order - # indicates that that level can be removed. - order = self.orders[order.id] - del self.orders[order.id] - side = self.bids if order.price<=self.best_bid else self.asks - side.remove(order.id) else: raise NotImplementedError(type(order)) return self From cb6a78463e36affce2dcaf9b91e41d0c45d8c5cf Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Sun, 22 Oct 2017 20:21:32 +0000 Subject: [PATCH 17/18] WIP: Adds debugging prints --- numismatic/exchanges/gdax.py | 4 ++-- numismatic/orderbooks.py | 28 ++++++++++++++++------------ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index e1724cd..12ca475 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -123,13 +123,13 @@ def _handle_packet(self, packet, symbol): volume = float(size) if side=='buy' else -float(size) event = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=float(price), - volume=volume, id=price) + volume=volume, id=float(price)) else: # this is actually removing a level # make up a fake volume to indicate bid or ask volume = 1.0 if side=='buy' else -1.0 event = CancelOrder(exchange=self.exchange, symbol=symbol, - timestamp=timestamp, id=price) + timestamp=timestamp, id=float(price)) self.output_stream.emit(event) elif isinstance(msg, dict): event = msg diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index e0f729f..0723d0e 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -12,18 +12,22 @@ class OrderBook: asks = attr.ib(default=attr.Factory(PriorityQueue)) def update(self, order): - if isinstance(order, LimitOrder): - side = self.bids if order.volume>0 else self.asks - price = -order.price if order.volume>0 else order.price - side.add(order.id, price) - self.orders[order.id] = order - elif isinstance(order, CancelOrder): - order = self.orders[order.id] - side = self.bids if order.volume>0 else self.asks - side.remove(order.id) - del self.orders[order.id] - else: - raise NotImplementedError(type(order)) + try: + if isinstance(order, LimitOrder): + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + side.add(order.id, price) + self.orders[order.id] = order + elif isinstance(order, CancelOrder): + order = self.orders[order.id] + side = self.bids if order.volume>0 else self.asks + side.remove(order.id) + del self.orders[order.id] + else: + raise NotImplementedError(type(order)) + except KeyError as e: + print(order) + print(e) return self @property From 0b8d1f580adb9813da63149502e08e83b6195b2b Mon Sep 17 00:00:00 2001 From: Tobias Brandt Date: Mon, 23 Oct 2017 11:14:23 +0200 Subject: [PATCH 18/18] WIP: Trying other variations for GDAX --- numismatic/exchanges/gdax.py | 4 ++-- numismatic/orderbooks.py | 30 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/numismatic/exchanges/gdax.py b/numismatic/exchanges/gdax.py index 12ca475..e1724cd 100644 --- a/numismatic/exchanges/gdax.py +++ b/numismatic/exchanges/gdax.py @@ -123,13 +123,13 @@ def _handle_packet(self, packet, symbol): volume = float(size) if side=='buy' else -float(size) event = LimitOrder(exchange=self.exchange, symbol=symbol, timestamp=timestamp, price=float(price), - volume=volume, id=float(price)) + volume=volume, id=price) else: # this is actually removing a level # make up a fake volume to indicate bid or ask volume = 1.0 if side=='buy' else -1.0 event = CancelOrder(exchange=self.exchange, symbol=symbol, - timestamp=timestamp, id=float(price)) + timestamp=timestamp, id=price) self.output_stream.emit(event) elif isinstance(msg, dict): event = msg diff --git a/numismatic/orderbooks.py b/numismatic/orderbooks.py index 0723d0e..354bace 100644 --- a/numismatic/orderbooks.py +++ b/numismatic/orderbooks.py @@ -12,22 +12,22 @@ class OrderBook: asks = attr.ib(default=attr.Factory(PriorityQueue)) def update(self, order): - try: - if isinstance(order, LimitOrder): - side = self.bids if order.volume>0 else self.asks - price = -order.price if order.volume>0 else order.price - side.add(order.id, price) - self.orders[order.id] = order - elif isinstance(order, CancelOrder): + if isinstance(order, LimitOrder): + side = self.bids if order.volume>0 else self.asks + price = -order.price if order.volume>0 else order.price + side.add(order.id, price) + self.orders[order.id] = order + elif isinstance(order, CancelOrder): + try: order = self.orders[order.id] - side = self.bids if order.volume>0 else self.asks - side.remove(order.id) - del self.orders[order.id] - else: - raise NotImplementedError(type(order)) - except KeyError as e: - print(order) - print(e) + except KeyError: + # Cancelling an order that doesn't exist + return self + side = self.bids if order.volume>0 else self.asks + side.remove(order.id) + del self.orders[order.id] + else: + raise NotImplementedError(type(order)) return self @property