Skip to content

Commit

Permalink
2.1.9
Browse files Browse the repository at this point in the history
  • Loading branch information
DogsTailFarmer committed Apr 14, 2024
1 parent a18aecf commit 49088c4
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 34 deletions.
10 changes: 7 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.9 2024-04-14
### Fix
* Creating and manage asynchronous tasks

## 2.1.8 2024-04-08
### Update
* `Bitfinex`: add logging for `cancel_order()` and `cancel_all_orders()` methods
Expand Down Expand Up @@ -125,15 +129,15 @@
## 1.4.9b2 2024-02-05
### Fix
* Binance: `TransferToMaster`: sentence `Email address should be encoded. e.g. [email protected] should be encoded into
alice%40test.com` from API docs the are False, must be `content += urlencode(kwargs["params"], safe="@")`
alice%40test.com` from API docs they are False, must be `content += urlencode(kwargs["params"], safe="@")`

### Update
* HTX: changed deprecated endpoint "v1/common/symbols" to "v1/settings/common/market-symbols"
* Binance: `GET /api/v3/exchangeInfo` from response remove deprecated `quotePrecision`

## 1.4.8 2024-02-02
### Added for new features
* Binance: `TransferToMaster` now can be used for collect assets on the sub-account
* Binance: `TransferToMaster` now can be used for collect assets on the subaccount

## 1.4.7.post6 2024-01-31
### Fix
Expand Down Expand Up @@ -165,7 +169,7 @@ alice%40test.com` from API docs the are False, must be `content += urlencode(kwa

## v1.4.4 2023-12-13
### Update
* Before send cancel order result checking if it was be executed and generating trade event
* Before send cancel order result checking if it was being executed and generating trade event
* Rollback 1.4.3
- Binance: in create Limit order parameters adding parameter "selfTradePreventionMode": "NONE"

Expand Down
2 changes: 1 addition & 1 deletion exchanges_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__contact__ = "https://github.com/DogsTailFarmer"
__email__ = "[email protected]"
__credits__ = ["https://github.com/DanyaSWorlD"]
__version__ = "2.1.8"
__version__ = "2.1.9"

from pathlib import Path
import shutil
Expand Down
22 changes: 14 additions & 8 deletions exchanges_wrapper/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ def __init__(self, *acc):
self.main_account_uid = None
self.ledgers_id = []
self.ts_start = {}
self.tasks = set()

def tasks_manage(self, coro):
_t = asyncio.create_task(coro)
self.tasks.add(_t)
_t.add_done_callback(self.tasks.discard)

async def fetch_object(self, key):
res = None
Expand Down Expand Up @@ -145,7 +151,7 @@ async def load(self, symbol):
# load rate limits
self.rate_limits = infos["rateLimits"]
self.loaded = True
logger.info(f"Info for {self.exchange}:{symbol} loaded success")
logger.info(f"Info for {self.exchange}:{symbol} loaded successfully")

async def close(self):
await self.session.close()
Expand Down Expand Up @@ -176,7 +182,7 @@ async def start_user_events_listener(self, _trade_id, symbol):
user_data_stream = BBTPrivateEventsDataStream(self, self.endpoint_ws_auth, self.exchange, _trade_id)
if user_data_stream:
self.data_streams[_trade_id] |= {user_data_stream}
asyncio.ensure_future(user_data_stream.start())
self.tasks_manage(user_data_stream.start())
timeout = STATUS_TIMEOUT / 0.1
while not user_data_stream.wss_started:
timeout -= 1
Expand All @@ -190,8 +196,7 @@ async def start_market_events_listener(self, _trade_id):
if self.exchange == 'binance':
market_data_stream = MarketEventsDataStream(self, self.endpoint_ws_public, self.exchange, _trade_id)
self.data_streams[_trade_id] |= {market_data_stream}
asyncio.ensure_future(market_data_stream.start())
# start_list.append(market_data_stream.start())
self.tasks_manage(market_data_stream.start())
else:
for channel in _events:
# https://www.okx.com/help-center/changes-to-v5-api-websocket-subscription-parameter-and-url
Expand All @@ -202,7 +207,7 @@ async def start_market_events_listener(self, _trade_id):
#
market_data_stream = MarketEventsDataStream(self, _endpoint, self.exchange, _trade_id, channel)
self.data_streams[_trade_id] |= {market_data_stream}
asyncio.ensure_future(market_data_stream.start())
self.tasks_manage(market_data_stream.start())

async def stop_events_listener(self, _trade_id):
logger.info(f"Stop events listener data streams for {_trade_id}")
Expand Down Expand Up @@ -1249,12 +1254,12 @@ async def cancel_all_orders(self, trade_id, symbol, receive_window=None):
params = {'category': 'spot', 'symbol': symbol}
res, _ = await self.http.send_api_call("/v5/order/cancel-all", method="POST", signed=True, **params)

tasks = []
tasks = set()
for order in res.get('list', []):
_id = order.get('orderId')
task = asyncio.ensure_future(self.fetch_object(f"oc-{_id}"))
task = asyncio.create_task(self.fetch_object(f"oc-{_id}"))
task.set_name(f"{_id}")
tasks.append(task)
tasks.add(task)

if tasks:
done, pending = await asyncio.wait(tasks, timeout=STATUS_TIMEOUT)
Expand All @@ -1267,6 +1272,7 @@ async def cancel_all_orders(self, trade_id, symbol, receive_window=None):
_res = await self.fetch_order(trade_id, symbol, order_id=_id, response_type=True)
binance_res.append(_res)
pending.clear()
tasks.clear()

return binance_res

Expand Down
11 changes: 2 additions & 9 deletions exchanges_wrapper/events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import functools

from collections import defaultdict
import logging

Expand All @@ -11,7 +10,6 @@
# based on: https://stackoverflow.com/a/2022629/10144963
class Handlers(list):
async def __call__(self, *args, **kwargs):
loop = asyncio.get_running_loop()
trade_id = kwargs.pop('trade_id', None)
_trade_id = None
for func in self:
Expand All @@ -20,12 +18,7 @@ async def __call__(self, *args, **kwargs):
except Exception as ex: # skipcq: PYL-W0703
logger.warning(f"Handlers error when try get trade_id: {ex}")
if trade_id is None or trade_id == _trade_id:
if asyncio.iscoroutinefunction(func):
await func(*args, **kwargs)
continue
if kwargs:
func = functools.partial(func, **kwargs)
loop.run_in_executor(None, func, *args)
await func(*args, **kwargs)

def __repr__(self):
return f"Handlers({list.__repr__(self)})"
Expand Down
10 changes: 6 additions & 4 deletions exchanges_wrapper/exch_srv.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,16 @@ async def open_client_connection(self, request: mr.OpenClientConnectionRequest)
raise GRPCError(status=Status.FAILED_PRECONDITION, message=str(ex))

try:
await open_client.client.load(request.symbol)
await asyncio.wait_for(open_client.client.load(request.symbol), timeout=HEARTBEAT * 60)
except asyncio.CancelledError:
pass # Task cancellation should not be logged as an error
except asyncio.exceptions.TimeoutError:
await OpenClient.get_client(client_id).client.session.close()
OpenClient.remove_client(request.account_name)
raise GRPCError(status=Status.UNAVAILABLE, message=f"'{open_client.name}' timeout error")
except Exception as ex:
logger.warning(f"OpenClientConnection for '{open_client.name}' exception: {ex}")
logger.debug(f"Exception traceback: {traceback.format_exc()}")
await OpenClient.get_client(client_id).client.session.close()
OpenClient.remove_client(request.account_name)
raise GRPCError(status=Status.RESOURCE_EXHAUSTED, message=str(ex))

# Set rate_limiter
Expand Down Expand Up @@ -851,7 +853,7 @@ async def amain(host: str = '127.0.0.1', port: int = 50051):
for oc in OpenClient.open_clients:
await oc.client.session.close()

[task.cancel() for task in asyncio.all_tasks() if not task.done()]
[task.cancel() for task in asyncio.all_tasks() if not task.done() and task is not asyncio.current_task()]


def main():
Expand Down
17 changes: 10 additions & 7 deletions exchanges_wrapper/web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, client, endpoint, exchange, trade_id):
self.wss_event_buffer = {}
self._order_book = None
self._price = None
self.tasks_list = []
self.tasks = set()
self.wss_started = False

async def start(self):
Expand Down Expand Up @@ -84,8 +84,13 @@ async def stop(self):
await self.websocket.close(code=4000)

def tasks_cancel(self):
[task.cancel() for task in self.tasks_list if not task.done()]
self.tasks_list.clear()
[task.cancel() for task in self.tasks if not task.done()]
self.tasks.clear()

def tasks_manage(self, coro):
_t = asyncio.create_task(coro)
self.tasks.add(_t)
_t.add_done_callback(self.tasks.discard)

async def _handle_event(self, *args):
pass # meant to be overridden in a subclass
Expand All @@ -110,8 +115,7 @@ async def _handle_messages(self, msg, symbol=None, ch_type=str()):
return
elif ((msg_data.get("ret_msg") == "subscribe" or msg_data.get("op") in ("auth", "subscribe"))
and msg_data.get("success")):
_task = asyncio.ensure_future(self.bybit_heartbeat(ch_type or "private"))
self.tasks_list.append(_task)
self.tasks_manage(self.bybit_heartbeat(ch_type or "private"))
if msg_data["op"] == "subscribe" and msg_data["success"] and not msg_data["ret_msg"]:
self.wss_started = True
elif ((msg_data.get("ret_msg") == "subscribe" or msg_data.get("op") in ("auth", "subscribe"))
Expand Down Expand Up @@ -602,8 +606,7 @@ async def _heartbeat(self, listen_key, interval=60 * 30):

async def start_wss(self):
logger.info(f"Start User WSS for {self.exchange}")
_task = asyncio.ensure_future(self._heartbeat(self.listen_key))
self.tasks_list.append(_task)
self.tasks_manage(self._heartbeat(self.listen_key))
try:
await self.ws_listener()
finally:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dynamic = ["version", "description"]
requires-python = ">=3.9"

dependencies = [
"crypto-ws-api==2.0.8",
"crypto-ws-api==2.0.9",
"grpcio==1.62.0",
"pyotp~=2.9.0",
"simplejson==3.19.2",
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
crypto-ws-api==2.0.8
crypto-ws-api==2.0.9
pyotp==2.9.0
simplejson==3.19.2
toml~=0.10.2
Expand Down

0 comments on commit 49088c4

Please sign in to comment.