From 4f6441505c6f2138e8e6d758b3ecd98b5fa50126 Mon Sep 17 00:00:00 2001 From: DogsTaiFarmer Date: Sun, 31 Mar 2024 12:50:19 +0300 Subject: [PATCH] 2.1.6 --- CHANGELOG.md | 7 +++++++ exchanges_wrapper/__init__.py | 2 +- exchanges_wrapper/bybit_parser.py | 4 ++-- exchanges_wrapper/client.py | 17 ++++++++--------- exchanges_wrapper/exch_srv.py | 8 ++++---- exchanges_wrapper/http_client.py | 12 ++++++------ exchanges_wrapper/web_sockets.py | 19 ++++++++++++++++--- pyproject.toml | 2 +- requirements.txt | 2 +- 9 files changed, 46 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ed00b3..ad36c16 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +## 2.1.6 2024-03-29 +### Fix +* Bybit: `on_balance_update`: missed event during transfer from API + +### Update +* Dependency: Up requirements for crypto-ws-api==2.0.8 + ## 2.1.5 2024-03-25 ### Fix * Bybit: `on_balance_update`: duplication during transfer from web-interface diff --git a/exchanges_wrapper/__init__.py b/exchanges_wrapper/__init__.py index 461c880..12ccb24 100755 --- a/exchanges_wrapper/__init__.py +++ b/exchanges_wrapper/__init__.py @@ -12,7 +12,7 @@ __contact__ = "https://github.com/DogsTailFarmer" __email__ = "jerry.fedorenko@yahoo.com" __credits__ = ["https://github.com/DanyaSWorlD"] -__version__ = "2.1.5" +__version__ = "2.1.6" from pathlib import Path import shutil diff --git a/exchanges_wrapper/bybit_parser.py b/exchanges_wrapper/bybit_parser.py index 7223214..43d6adc 100644 --- a/exchanges_wrapper/bybit_parser.py +++ b/exchanges_wrapper/bybit_parser.py @@ -451,8 +451,8 @@ def on_balance_update(data_in: list, ts: str, symbol: str, mode: str, uid=None) elif mode == 'universal': for i in data_in: if i['coin'] in symbol and \ - ((i['fromAccountType'] == 'UNIFIED' and i['fromMemberId'] == uid) - or (i['toAccountType'] == 'UNIFIED' and i['toMemberId'] == uid)): + ((i['fromAccountType'] == 'UNIFIED' and i['fromMemberId'] == str(uid)) + or (i['toAccountType'] == 'UNIFIED' and i['toMemberId'] == str(uid))): data_out.append( { i['transferId']: { diff --git a/exchanges_wrapper/client.py b/exchanges_wrapper/client.py index 27bb766..4ed25c5 100644 --- a/exchanges_wrapper/client.py +++ b/exchanges_wrapper/client.py @@ -1078,15 +1078,15 @@ async def cancel_order( # lgtm [py/similar-function] "This query requires an order_id on Bitfinex. Deletion by user number is not implemented." ) params = {'id': order_id} - res = ( - await self.user_wss_session.handle_request(trade_id, "oc", _params=params) - or await self.http.send_api_call( + res = await self.user_wss_session.handle_request(trade_id, "oc", _params=params) + if res is None or (res and isinstance(res, list) and res[6] == 'ERROR'): + logger.debug(f"cancel_order.bitfinex {order_id}: res1: {res}") + res = await self.http.send_api_call( "v2/auth/w/order/cancel", method="POST", signed=True, **params - ) - ) + ) if res and isinstance(res, list) and res[6] == 'SUCCESS': timeout = STATUS_TIMEOUT / 0.1 while timeout: @@ -1096,7 +1096,7 @@ async def cancel_order( # lgtm [py/similar-function] break await asyncio.sleep(0.1) else: - logger.warning(f"cancel_order.bitfinex {order_id}: res: {res}") + logger.debug(f"cancel_order.bitfinex {order_id}: res2: {res}") elif self.exchange == 'huobi': res = await self.http.send_api_call( f"v1/order/orders/{order_id}/submitcancel", @@ -1109,7 +1109,6 @@ async def cancel_order( # lgtm [py/similar-function] timeout -= 1 await asyncio.sleep(0.1) binance_res = await self.fetch_order(trade_id, symbol, order_id=res, response_type=True) - elif self.exchange == 'okx': _symbol = self.symbol_to_okx(symbol) _queue = asyncio.Queue() @@ -1187,9 +1186,10 @@ async def cancel_all_orders(self, trade_id, symbol, receive_window=None): **params, ) ) - logger.debug(f"cancel_all_orders.res: {res}") if res and res[6] == 'SUCCESS': return bfx.orders(res[4], response_type=True, cancelled=True) + logger.debug(f"bitfinex: cancel_all_orders.res: {res}") + elif self.exchange == 'huobi': orders = await self.fetch_open_orders(trade_id, symbol, receive_window=receive_window, response_type=True) orders_id = [str(order.get('orderId')) for order in orders] @@ -1295,7 +1295,6 @@ async def fetch_open_orders(self, trade_id, symbol, receive_window=None, respons method="POST", signed=True ) - # logger.debug(f"fetch_open_orders.res: {res}") if res: binance_res = bfx.orders(res) elif self.exchange == 'huobi': diff --git a/exchanges_wrapper/exch_srv.py b/exchanges_wrapper/exch_srv.py index d50da14..059d429 100755 --- a/exchanges_wrapper/exch_srv.py +++ b/exchanges_wrapper/exch_srv.py @@ -174,8 +174,7 @@ async def open_client_connection(self, request: mr.OpenClientConnectionRequest) if main_client.account_uid and main_client.account_id: open_client.client.main_account_uid = main_client.account_uid open_client.client.main_account_id = main_client.account_id - logger.info(f"The values for main Huobi account were received and set:" - f" UID: {main_client.account_uid} and account ID: {main_client.account_id}") + logger.info(f"Huobi UID: {main_client.account_uid} and account ID: {main_client.account_id}") else: logger.warning("No account IDs were received for the Huobi master account") await main_client.close() @@ -232,7 +231,8 @@ async def send_request(self, client_method_name, request, rate_limit=False, **kw try: res = await getattr(client, client_method_name)(**kwargs) except (asyncio.CancelledError, asyncio.exceptions.CancelledError): - pass # Task cancellation should not be logged as an error + msg = f"{msg_header} Server Shutdown" + raise GRPCError(status=Status.UNAVAILABLE, message=msg) except (errors.RateLimitReached, errors.QueryCanceled) as ex: Martin.rate_limit_reached_time = time.time() msg = f"{msg_header} RateLimitReached: {ex}" @@ -655,7 +655,7 @@ async def on_balance_update(self, request: mr.MarketRequest) -> mr.StreamRespons return _events.append(_event) _get_event_from_queue = True - except asyncio.TimeoutError: + except asyncio.exceptions.TimeoutError: _get_event_from_queue = False if client.exchange in ('bitfinex', 'huobi', 'bybit'): diff --git a/exchanges_wrapper/http_client.py b/exchanges_wrapper/http_client.py index ac65cea..3658b4b 100644 --- a/exchanges_wrapper/http_client.py +++ b/exchanges_wrapper/http_client.py @@ -58,10 +58,7 @@ async def handle_errors(self, response): if payload: if payload.get("error", "") == "ERR_RATE_LIMIT": raise RateLimitReached(RateLimitReached.message) - elif ( - (self.exchange == 'binance' and payload.get('code', 0) == -1021) - or (self.exchange == 'bybit' and payload.get('retCode', 0) == 10002) - ): + elif self.exchange == 'binance' and payload.get('code', 0) == -1021: raise ExchangeError(ERR_TIMESTAMP_OUTSIDE_RECV_WINDOW) else: raise ExchangeError(f"ExchangeError: {payload}") @@ -82,8 +79,11 @@ async def handle_errors(self, response): else: raise HTTPError(f"Malformed request: {payload}:{response.reason}:{response.text}") - if self.exchange == 'bybit' and payload and payload.get('retCode') == 0: - return payload.get('result'), payload.get('time') + if self.exchange == 'bybit' and payload: + if payload.get('retCode') == 0: + return payload.get('result'), payload.get('time') + elif payload.get('retCode') == 10002: + raise ExchangeError(ERR_TIMESTAMP_OUTSIDE_RECV_WINDOW) elif self.exchange == 'huobi' and payload and (payload.get('status') == 'ok' or payload.get('ok')): return payload.get('data', payload.get('tick')) elif self.exchange == 'okx' and payload and payload.get('code') == '0': diff --git a/exchanges_wrapper/web_sockets.py b/exchanges_wrapper/web_sockets.py index 6471bdd..51194c5 100644 --- a/exchanges_wrapper/web_sockets.py +++ b/exchanges_wrapper/web_sockets.py @@ -53,7 +53,12 @@ def __init__(self, client, endpoint, exchange, trade_id): self.wss_started = False async def start(self): - async for self.websocket in websockets.client.connect(self.endpoint, logger=logger_ws): + ping_interval = None if self.exchange == 'huobi' else 20 + async for self.websocket in websockets.connect( + self.endpoint, + logger=logger_ws, + ping_interval=ping_interval + ): try: await self.start_wss() except ConnectionClosed as ex: @@ -178,8 +183,9 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): else: logger.debug(f"Bitfinex undefined WSS: symbol: {symbol}, ch_type: {ch_type}, msg_data: {msg_data}") elif self.exchange == 'huobi': - if msg_data.get('ping'): - await self.websocket.send(json.dumps({"pong": msg_data.get('ping')})) + if ping := msg_data.get('ping'): + await self.websocket.send(json.dumps({"pong": ping})) + await asyncio.sleep(0) elif msg_data.get('action') == 'ping': pong = { "action": "pong", @@ -188,6 +194,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): } } await self.websocket.send(json.dumps(pong)) + await asyncio.sleep(0) elif msg_data.get('tick') or msg_data.get('data'): if ch_type == 'ticker': _price = msg_data.get('tick', {}).get('lastPrice', None) @@ -210,6 +217,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()): async def ws_listener(self, request=None, symbol=None, ch_type=str()): if request: await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) async for msg_data in self.websocket: await self._handle_messages(msg_data, symbol, ch_type) @@ -370,6 +378,7 @@ async def start_wss(self): "params": _params } await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) await self._handle_messages(await self.websocket.recv(), symbol=self.symbol) # request = { @@ -377,6 +386,7 @@ async def start_wss(self): "ch": "accounts.update#2" } await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) await self._handle_messages(await self.websocket.recv(), symbol=self.symbol) # request = { @@ -384,6 +394,7 @@ async def start_wss(self): "ch": f"orders#{self.symbol.lower()}" } await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) await self._handle_messages(await self.websocket.recv(), symbol=self.symbol) # request = { @@ -489,6 +500,7 @@ async def start_wss(self): ] } await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) await self._handle_messages(await self.websocket.recv()) # Channel subscription request = {"op": 'subscribe', @@ -539,6 +551,7 @@ async def start_wss(self): "args": [self.client.api_key, ts, signature] } await self.websocket.send(json.dumps(request)) + await asyncio.sleep(0) await self._handle_messages(await self.websocket.recv()) # Channel subscription request = { diff --git a/pyproject.toml b/pyproject.toml index d17eef1..19b677d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic = ["version", "description"] requires-python = ">=3.9" dependencies = [ - "crypto-ws-api==2.0.7", + "crypto-ws-api==2.0.8", "grpcio==1.62.0", "pyotp~=2.9.0", "simplejson==3.19.2", diff --git a/requirements.txt b/requirements.txt index d398ad7..cd9ed50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -crypto-ws-api==2.0.7 +crypto-ws-api==2.0.8 pyotp==2.9.0 simplejson==3.19.2 toml~=0.10.2