diff --git a/exchanges_wrapper/web_sockets.py b/exchanges_wrapper/web_sockets.py index 51194c5..35e67cc 100644 --- a/exchanges_wrapper/web_sockets.py +++ b/exchanges_wrapper/web_sockets.py @@ -8,8 +8,7 @@ import gzip from datetime import datetime, timezone from urllib.parse import urlencode, urlparse -import websockets.client -from websockets import ConnectionClosed +import websockets import exchanges_wrapper.bitfinex_parser as bfx import exchanges_wrapper.huobi_parser as hbp @@ -61,7 +60,7 @@ async def start(self): ): try: await self.start_wss() - except ConnectionClosed as ex: + except websockets.ConnectionClosed as ex: self.tasks_cancel() if ex.code == 4000: logger.info(f"WSS closed for {self.exchange}:{self.trade_id}") @@ -118,7 +117,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): elif ((msg_data.get("ret_msg") == "subscribe" or msg_data.get("op") in ("auth", "subscribe")) and not msg_data.get("success")): logger.warning(f"Reconnecting ByBit WSS: {symbol}: {ch_type}, msg_data: {msg_data}") - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) else: logger.info(f"ByBit undefined WSS: symbol: {symbol}, ch_type: {ch_type}, msg_data: {msg_data}") elif self.exchange == 'okx': @@ -134,7 +133,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): self.wss_started = True elif msg_data.get("event") in ("login", "error") and msg_data.get("code") != "0": logger.warning(f"Reconnecting OKX WSS: {symbol}: {ch_type}, msg_data: {msg_data}") - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) else: logger.debug(f"OKX undefined WSS: symbol: {symbol}, ch_type: {ch_type}, msg_data: {msg_data}") elif self.exchange == 'bitfinex': @@ -145,12 +144,12 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): if msg_data.get('platform') and msg_data.get('platform').get('status') != 1: logger.warning(f"Exchange in maintenance mode, trying reconnect. Exchange info: {msg}") await asyncio.sleep(60) - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) elif 'code' in msg_data: code = msg_data.get('code') if code == 10300: logger.warning('WSS Subscription failed (generic)') - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) elif code == 10301: logger.error('WSS Already subscribed') elif code == 10302: @@ -159,11 +158,11 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): raise UserWarning('WSS Reached limit of open channels') elif code == 20051: logger.warning('WSS reconnection request received from exchange') - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) elif code == 20060: logger.info('WSS entering in maintenance mode, trying reconnect after 120s') await asyncio.sleep(120) - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) elif msg_data.get('event') == 'subscribed': chan_id = msg_data.get('chanId') logger.info(f"bitfinex, ch_type: {ch_type}, chan_id: {chan_id}") @@ -210,7 +209,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): msg_data.get('code') == 500 and msg_data.get('message') == '系统异常:'): logger.warning(f"Reconnecting Huobi user {ch_type} channel") - raise ConnectionClosed(None, None) + raise websockets.ConnectionClosed(None, None) else: logger.debug(f"Huobi undefined WSS: symbol: {symbol}, ch_type: {ch_type}, msg_data: {msg_data}")