diff --git a/CHANGELOG.md b/CHANGELOG.md index 156eb81..eaddc97 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 3.0.4 - 2024-04-14 +### Fix +* Creating and manage asynchronous tasks + +### Update +* Up requirements for exchanges-wrapper==2.1.9 +* 'Backtesting': some minor improvements +* Remove cross-version updates + ## 3.0.3 - 2024-04-08 ### Update * Refine templates handling diff --git a/martin_binance/__init__.py b/martin_binance/__init__.py index 0615159..43afad9 100755 --- a/martin_binance/__init__.py +++ b/martin_binance/__init__.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.3" +__version__ = "3.0.4" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -29,10 +29,6 @@ TRIAL_PARAMS = Path(WORK_PATH, "trial_params.json") EQUAL_STR = "================================================================" -# TODO remove after update to 3.0.3 -if CONFIG_FILE.exists() and not TRIAL_PARAMS.exists(): - copy(Path(Path(__file__).parent.absolute(), "templates/trial_params.json"), TRIAL_PARAMS) - def init(): if CONFIG_FILE.exists(): diff --git a/martin_binance/backtest/optimizer.py b/martin_binance/backtest/optimizer.py index 6379344..66a5359 100755 --- a/martin_binance/backtest/optimizer.py +++ b/martin_binance/backtest/optimizer.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2024 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.3" +__version__ = "3.0.4" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -86,12 +86,6 @@ def objective(_trial): return _study -async def run_optimize(*args): - process = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) - stdout, _ = await process.communicate() - return stdout.splitlines()[0] - - if __name__ == "__main__": logger = logging.getLogger('logger_S') logger.level = logging.INFO diff --git a/martin_binance/client.py b/martin_binance/client.py index 0febb83..aebdcdd 100644 --- a/martin_binance/client.py +++ b/martin_binance/client.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1" +__version__ = "3.0.4" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -71,12 +71,12 @@ async def connect(self): raise UserWarning(f"{ex}, reconnect...") from None except GRPCError as ex: status_code = ex.status + logger.warning(f"Exception on register client: {status_code.name}, {ex.message}") if status_code == Status.FAILED_PRECONDITION: raise SystemExit(1) from ex - raise UserWarning(f"Exception on register client: {status_code.name}, {ex.message}") from None + raise UserWarning else: - logger.info(f"gRPC session started for client_id: {_client.client_id}\n" - f"trade_id: {self.trade_id}") + logger.info(f"gRPC session started for client_id: {_client.client_id}, trade_id: {self.trade_id}") return _client async def send_request(self, _request, _request_type, **kwargs): @@ -85,7 +85,7 @@ async def send_request(self, _request, _request_type, **kwargs): kwargs['client_id'] = self.client.client_id kwargs['trade_id'] = self.trade_id try: - res = await _request(_request_type(**kwargs)) + return await _request(_request_type(**kwargs)) except asyncio.CancelledError: pass # Task cancellation should not be logged as an error except grpclib.exceptions.StreamTerminatedError: @@ -101,8 +101,6 @@ async def send_request(self, _request, _request_type, **kwargs): raise except Exception as ex: logger.error(f"Exception on send request {ex}") - else: - return res async def for_request(self, _request, _request_type, **kwargs): if not self.client: diff --git a/martin_binance/strategy_base.py b/martin_binance/strategy_base.py index 33cb8a7..65214ae 100644 --- a/martin_binance/strategy_base.py +++ b/martin_binance/strategy_base.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.2" +__version__ = "3.0.4" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -39,7 +39,7 @@ import martin_binance.params as prm from martin_binance import LAST_STATE_PATH, BACKTEST_PATH, HEARTBEAT, KLINES_INIT, EQUAL_STR, ORDER_TIMEOUT from martin_binance.backtest.exchange_simulator import Account as backTestAccount -from martin_binance.backtest.optimizer import run_optimize, OPTIMIZER, PARAMS_FLOAT +from martin_binance.backtest.optimizer import OPTIMIZER, PARAMS_FLOAT from martin_binance.client import Trade from martin_binance.lib import Candle, TradingCapabilityManager, Ticker, FundsEntry, OrderBook, Style, \ any2str, PrivateTrade, Order, convert_from_minute, OrderUpdate, load_file, load_last_state, Klines @@ -48,7 +48,7 @@ logger = logging.getLogger('logger_S') else: logger = logging.getLogger('logger') -loop = asyncio.get_event_loop() + color_init() RATE_LIMITER = HEARTBEAT * 5 @@ -112,7 +112,7 @@ def __init__(self): self.session_root = None self.state_file = None self.operational_status = None - self.tasks_list = [] + self.tasks = set() # self.time_operational = {'ts': 0.0, 'diff': 0.0, 'new': 0.0} # - See get_time() self.account = None @@ -122,6 +122,7 @@ def __init__(self): self.tlg_header = '' # - Header for Telegram message self.start_collect = None self.s_mode_break = None + self.backtest_process = None # Init in reset_backtest_vars() self.s_ticker = None self.s_order_book = None @@ -197,7 +198,7 @@ def get_buffered_ticker(self) -> Ticker: def get_buffered_funds(self) -> Dict[str, FundsEntry]: if self.get_time() - self.get_buffered_funds_last_time > self.rate_limiter: - loop.create_task(self.buffered_funds(print_info=False)) + self.tasks_manage(self.buffered_funds(print_info=False)) self.get_buffered_funds_last_time = self.get_time() return {self.base_asset: FundsEntry(self.funds[self.base_asset]), self.quote_asset: FundsEntry(self.funds[self.quote_asset])} @@ -240,22 +241,22 @@ def get_time(self) -> float: def transfer_to_master(self, symbol: str, amount: str): if prm.MODE in ('T', 'TC'): - loop.create_task(self.transfer2master(symbol, amount)) + self.tasks_manage(self.transfer2master(symbol, amount)) def place_limit_order(self, buy: bool, amount: Decimal, price: Decimal) -> int: self.order_id += 1 self.message_log(f"Send order id:{self.order_id} for {'BUY' if buy else 'SELL'}" f" {any2str(amount)} by {any2str(price)} = {any2str(amount * price)}", color=Style.B_YELLOW) - loop.create_task(self.place_limit_order_timeout(self.order_id)) - loop.create_task(self.create_limit_order(self.order_id, buy, any2str(amount), any2str(price))) + self.tasks_manage(self.place_limit_order_timeout(self.order_id)) + self.tasks_manage(self.create_limit_order(self.order_id, buy, any2str(amount), any2str(price))) if self.exchange == 'huobi': time.sleep(0.02) return self.order_id def cancel_order(self, order_id: int, cancel_all=False) -> None: - loop.create_task(self.cancel_order_timeout(order_id)) - loop.create_task(self.cancel_order_call(order_id, cancel_all)) + self.tasks_manage(self.cancel_order_timeout(order_id)) + self.tasks_manage(self.cancel_order_call(order_id, cancel_all)) def message_log(self, msg: str, log_level=logging.INFO, tlg=False, color=Style.WHITE) -> None: if prm.LOGGING: @@ -322,7 +323,7 @@ async def backtest_control(self): _ts = datetime.now(timezone.utc).replace(tzinfo=None) storage_name = Path(self.session_root, "_study.db") try: - _res = await run_optimize( + self.backtest_process = await asyncio.create_subprocess_exec( OPTIMIZER, f"{self.exchange}_{self.symbol}", Path(self.session_root, Path(prm.PARAMS).name), @@ -330,8 +331,16 @@ async def backtest_control(self): f"sqlite:///{storage_name}", json.dumps(prm_best or _prm_best), f"{prm.ID_EXCHANGE}_{prm.SYMBOL}_S.log", + + stdout=asyncio.subprocess.PIPE ) - prm_best = orjson.loads(_res) + stdout, _ = await self.backtest_process.communicate() + _res = stdout.splitlines() + if _res: + prm_best = orjson.loads(_res[0]) + else: + self.message_log("Backtest control: result is empty", log_level=logging.WARNING) + break except (asyncio.CancelledError, KeyboardInterrupt): break except Exception as err: @@ -339,6 +348,7 @@ async def backtest_control(self): self.message_log(f"Exception traceback: {traceback.format_exc()}", log_level=logging.DEBUG) break # + self.backtest_process = None storage_name.replace(storage_name.with_name('study.db')) if prm_best: _prm_best = dict(prm_best) @@ -469,7 +479,7 @@ def back_test_handler(self): print(f"Original time: {original_time}, test time: {test_time}, x = {original_time / test_time:.2f}") if prm.SAVE_DS: self._back_test_handler_ext() - loop.stop() + asyncio.get_event_loop().stop() def _back_test_handler_ext(self): # Save test data @@ -643,6 +653,9 @@ async def ask_exit(self): self.message_log("Got signal for exit", color=Style.MAGENTA) self.operational_status = False self.s_mode_break = True + if self.backtest_process: + self.backtest_process.terminate() + self.message_log("Backtest process was terminated", color=Style.GREEN) await asyncio.sleep(HEARTBEAT) if prm.MODE in ('T', 'TC'): try: @@ -821,7 +834,7 @@ async def cancel_order_call(self, _id: int, cancel_all=False, count=0): self.message_log(f"Cancel order {_id}: Warning, not result getting") _fetch_order = True finally: - if _fetch_order: + if prm.MODE in ('T', 'TC') and _fetch_order: res = await self.fetch_order(_id, _filled_update_call=True) if res.get('status') in ('CANCELED', 'EXPIRED_IN_MATCH'): await self.cancel_order_handler(_id, cancel_all) @@ -851,6 +864,7 @@ async def cancel_order_timeout(self, _id): if _id in self.canceled_order_id: self.canceled_order_id.remove(_id) self.on_cancel_order_error_string(_id, 'Cancel order timeout') + # await asyncio.sleep(0) async def transfer2master(self, symbol: str, amount: str): try: @@ -966,7 +980,7 @@ async def buffered_candle(self): klines[i.value] = kline_i if len(klines) == len(KLINES_INIT): - self.tasks_list.append(asyncio.ensure_future(self.on_klines_update(klines))) + self.tasks_manage(self.on_klines_update(klines)) else: self.message_log("Init buffered candle failed. try one else...", log_level=logging.WARNING) await asyncio.sleep(random.uniform(1, 5)) @@ -998,7 +1012,7 @@ async def on_klines_update(self, _klines: {str: Klines}): self.wss_fire_up = True else: for i in _intervals: - loop.create_task(self.aiter_candles(_klines, i)) + self.tasks_manage(self.aiter_candles(_klines, i)) async def create_limit_order(self, _id: int, buy: bool, amount: str, price: str) -> None: self.wait_order_id.append(_id) @@ -1046,7 +1060,7 @@ async def create_limit_order(self, _id: int, buy: bool, amount: str, price: str) else: _fetch_order = True finally: - if _fetch_order: + if prm.MODE in ('T', 'TC') and _fetch_order: await asyncio.sleep(HEARTBEAT) res = await self.fetch_order(0, str(_id), _filled_update_call=True) if res.get('status') in ('NEW', 'PARTIALLY_FILLED', 'FILLED'): @@ -1376,18 +1390,23 @@ async def wait_wss_init(self): if res.success: self.operational_status = True + def tasks_manage(self, coro): + _t = asyncio.create_task(coro) + self.tasks.add(_t) + _t.add_done_callback(self.tasks.discard) + async def wss_declare(self): # Market stream - self.tasks_list.append(asyncio.ensure_future(self.on_ticker_update())) + self.tasks_manage(self.on_ticker_update()) await self.buffered_candle() - self.tasks_list.append(asyncio.ensure_future(self.on_order_book_update())) + self.tasks_manage(self.on_order_book_update()) if prm.MODE in ('T', 'TC'): # User Stream - self.tasks_list.append(asyncio.ensure_future(self.on_funds_update())) - self.tasks_list.append(asyncio.ensure_future(self.on_order_update())) - self.tasks_list.append(asyncio.ensure_future(self.on_balance_update())) + self.tasks_manage(self.on_funds_update()) + self.tasks_manage(self.on_order_update()) + self.tasks_manage(self.on_balance_update()) if prm.MODE == 'TC': - self.tasks_list.append(asyncio.ensure_future(self.backtest_control())) + self.tasks_manage(self.backtest_control()) async def wss_init(self, update_max_queue_size=False): if self.client_id: @@ -1416,8 +1435,8 @@ async def wss_init(self, update_max_queue_size=False): self.wss_fire_up = True def task_cancel(self): - [task.cancel() for task in self.tasks_list if not task.done()] - self.tasks_list.clear() + [task.cancel() for task in self.tasks if not task.done()] + self.tasks.clear() async def main(self, _symbol): restore_state = None @@ -1486,7 +1505,7 @@ async def main(self, _symbol): else: [exch_orders_ids.append(int(_o['orderId'])) for _o in active_orders] # Init section - loop.create_task(self.get_exchange_info(send_request, _symbol)) + self.tasks_manage(self.get_exchange_info(send_request, _symbol)) while not self.info_symbol: await asyncio.sleep(0.1) if prm.LOGGING: @@ -1592,14 +1611,7 @@ async def main(self, _symbol): last_state.pop('ms_start_time_ms', str(int(time.time() * 1000))) ) - # TODO Replace after 3.0.2 - # self.orders = jsonpickle.decode(last_state.pop(MS_ORDERS, '{}'), keys=True) - - _orders = last_state.pop(MS_ORDERS, '{}') - _orders = _orders.replace('margin_wrapper', 'lib') - self.orders = jsonpickle.decode(_orders, keys=True) - # - + self.orders = jsonpickle.decode(last_state.pop(MS_ORDERS, '{}'), keys=True) orders_keys = self.orders.keys() for _id in exch_orders_ids: if _id not in orders_keys: @@ -1641,14 +1653,14 @@ async def main(self, _symbol): if prm.MODE in ('T', 'TC'): await self.wss_init() await self.wait_wss_init() - loop.create_task(save_to_csv()) - loop.create_task(self.buffered_orders()) + self.tasks_manage(save_to_csv()) + self.tasks_manage(self.buffered_orders()) if self.session.client.real_market: - loop.create_task(self.save_asset()) + self.tasks_manage(self.save_asset()) if not restore_state: self.start() - loop.create_task(self.heartbeat(self.session)) + self.tasks_manage(self.heartbeat(self.session)) except (KeyboardInterrupt, SystemExit): # noinspection PyProtectedMember, PyUnresolvedReferences diff --git a/martin_binance/templates/cli_0_BTCUSDT.py b/martin_binance/templates/cli_0_BTCUSDT.py index 2815509..94d89a9 100644 --- a/martin_binance/templates/cli_0_BTCUSDT.py +++ b/martin_binance/templates/cli_0_BTCUSDT.py @@ -137,12 +137,12 @@ def trade(strategy=None): loop.create_task(strategy.main(ex.SYMBOL)) loop.run_forever() except KeyboardInterrupt: - pass + pass # user interrupt finally: try: loop.run_until_complete(strategy.ask_exit()) except (asyncio.CancelledError, KeyboardInterrupt): - pass + pass # user interrupt except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/martin_binance/templates/cli_1_BTCUSDT.py b/martin_binance/templates/cli_1_BTCUSDT.py index f53fb69..f0ba03c 100644 --- a/martin_binance/templates/cli_1_BTCUSDT.py +++ b/martin_binance/templates/cli_1_BTCUSDT.py @@ -137,12 +137,12 @@ def trade(strategy=None): loop.create_task(strategy.main(ex.SYMBOL)) loop.run_forever() except KeyboardInterrupt: - pass + pass # user interrupt finally: try: loop.run_until_complete(strategy.ask_exit()) except (asyncio.CancelledError, KeyboardInterrupt): - pass + pass # user interrupt except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/martin_binance/templates/cli_2_TESTBTCTESTUSDT.py b/martin_binance/templates/cli_2_TESTBTCTESTUSDT.py index 9e558ff..b7677af 100644 --- a/martin_binance/templates/cli_2_TESTBTCTESTUSDT.py +++ b/martin_binance/templates/cli_2_TESTBTCTESTUSDT.py @@ -137,12 +137,12 @@ def trade(strategy=None): loop.create_task(strategy.main(ex.SYMBOL)) loop.run_forever() except KeyboardInterrupt: - pass + pass # user interrupt finally: try: loop.run_until_complete(strategy.ask_exit()) except (asyncio.CancelledError, KeyboardInterrupt): - pass + pass # user interrupt except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/martin_binance/templates/cli_3_BTCUSDT.py b/martin_binance/templates/cli_3_BTCUSDT.py index e30d1f7..2736347 100755 --- a/martin_binance/templates/cli_3_BTCUSDT.py +++ b/martin_binance/templates/cli_3_BTCUSDT.py @@ -137,12 +137,12 @@ def trade(strategy=None): loop.create_task(strategy.main(ex.SYMBOL)) loop.run_forever() except KeyboardInterrupt: - pass + pass # user interrupt finally: try: loop.run_until_complete(strategy.ask_exit()) except (asyncio.CancelledError, KeyboardInterrupt): - pass + pass # user interrupt except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/pyproject.toml b/pyproject.toml index 716e033..33c86c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic = ["version", "description"] requires-python = ">=3.9" dependencies = [ - "exchanges-wrapper==2.1.8", + "exchanges-wrapper==2.1.9", "jsonpickle==3.0.2", "psutil==5.9.6", "requests==2.31.0", diff --git a/requirements.txt b/requirements.txt index adf56ed..c33e57f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -exchanges-wrapper==2.1.8 +exchanges-wrapper==2.1.9 jsonpickle==3.0.2 psutil==5.9.6 requests==2.31.0