Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions binance/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._conn.__aexit__(exc_type, exc_val, exc_tb)
self.ws = None
if not self._handle_read_loop:
self._log.error("CANCEL read_loop")
self._log.error(f"{self._path} CANCEL read_loop")
await self._kill_read_loop()

async def connect(self):
Expand All @@ -87,7 +87,8 @@ async def connect(self):
self._conn = ws.connect(ws_url, close_timeout=0.1) # type: ignore
try:
self.ws = await self._conn.__aenter__()
except: # noqa
except Exception: # noqa
self._log.debug(f"{self._path} exception caught. Reconnecting.")
await self._reconnect()
return
self.ws_state = WSListenerState.STREAMING
Expand Down Expand Up @@ -121,6 +122,7 @@ def _handle_message(self, evt):
return None

async def _read_loop(self):
self._log.debug(f"_read_loop started for symbol {self._path}")
try:
while True:
try:
Expand Down Expand Up @@ -150,15 +152,15 @@ async def _read_loop(self):
})
raise BinanceWebsocketUnableToConnect
except asyncio.TimeoutError:
self._log.debug(f"no message in {self.TIMEOUT} seconds")
self._log.debug(f"{self._path} no message in {self.TIMEOUT} seconds")
# _no_message_received_reconnect
except asyncio.CancelledError as e:
self._log.debug(f"cancelled error {e}")
break
self._log.debug(f"{self._path} cancelled error {e}")
raise e
except asyncio.IncompleteReadError as e:
self._log.debug(f"incomplete read error ({e})")
except ConnectionClosedError as e:
self._log.debug(f"connection close error ({e})")
self._log.debug(f"{self._path} connection close error ({e})")
except gaierror as e:
self._log.debug(f"DNS Error ({e})")
except BinanceWebsocketUnableToConnect as e:
Expand All @@ -182,7 +184,7 @@ async def _run_reconnect(self):
await asyncio.sleep(reconnect_wait)
await self.connect()
else:
self._log.error(f'Max reconnections {self.MAX_RECONNECTS} reached:')
self._log.error(f'{self._path} Max reconnections {self.MAX_RECONNECTS} reached:')
# Signal the error
await self._queue.put({
'e': 'error',
Expand All @@ -196,7 +198,11 @@ async def recv(self):
try:
res = await asyncio.wait_for(self._queue.get(), timeout=self.TIMEOUT)
except asyncio.TimeoutError:
self._log.debug(f"no message in {self.TIMEOUT} seconds")
self._log.debug(f"{self._path} recv no message in {self.TIMEOUT} seconds")

# Yield control to the event loop
await asyncio.sleep(0)

return res

async def _wait_for_reconnect(self):
Expand All @@ -214,7 +220,6 @@ async def before_reconnect(self):
self._reconnects += 1

def _no_message_received_reconnect(self):
self._log.debug('No message received, reconnecting')
self.ws_state = WSListenerState.RECONNECTING

async def _reconnect(self):
Expand Down