From b51f242e33cdf7bce250144a4e4b6db6eaffc3fe Mon Sep 17 00:00:00 2001 From: DogsTailFarmer Date: Tue, 19 Mar 2024 23:48:18 +0300 Subject: [PATCH] 3.0.1rc1 --- CHANGELOG.md | 13 +- README.md | 2 +- martin_binance/__init__.py | 2 +- martin_binance/backtest/OoTSP.py | 3 +- martin_binance/backtest/optimizer.py | 50 ++++--- martin_binance/cli_0_BTCUSDT.py.template | 9 +- martin_binance/cli_1_BTCUSDT.py.template | 11 +- .../cli_2_TESTBTCTESTUSDT.py.template | 9 +- martin_binance/cli_3_BTCUSDT.py.template | 11 +- martin_binance/client.py | 21 ++- martin_binance/executor.py | 45 +++--- martin_binance/strategy_base.py | 132 ++++++++++++------ pyproject.toml | 2 +- requirements.txt | 2 +- 14 files changed, 183 insertions(+), 129 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f25a2e0..77efe4c 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,18 @@ -## 3.0.0rc21 - 2024-03-13 +## 3.0.1rc1 - 2024-03-19 +### Fix +* Cyclic Backtesting workflow + +### Update +* Up requirements for exchanges-wrapper==2.1.2 + + +## 3.0.0rc22 - 2024-03-13 ### Fix * `Analytics`: Refine unload and processing assets data +### Update +* Up requirements for exchanges-wrapper==2.1.0 + ## 3.0.0rc20 - 2024-03-12 ### Fix * `cancel_order_call()`: incorrect using asyncio.wait_for() diff --git a/README.md b/README.md index 286739d..38ec711 100755 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ All risks and possible losses associated with use of this strategy lie with you. Strongly recommended that you test the strategy in the demo mode before using real bidding. ## Important notices -* After update to `3.0.0`, the configuration files `cli_XX_AAABBB.py` for all running trading pairs +* After update to `3.0.1`, the configuration files `cli_XX_AAABBB.py` for all running trading pairs should be updated. Use templates for reference. * You cannot run multiple pairs with overlapping currencies on the same account! diff --git a/martin_binance/__init__.py b/martin_binance/__init__.py index 492a92c..b2b36ca 100755 --- a/martin_binance/__init__.py +++ b/martin_binance/__init__.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc21" +__version__ = "3.0.1rc1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/backtest/OoTSP.py b/martin_binance/backtest/OoTSP.py index 5c79655..36d13fc 100644 --- a/martin_binance/backtest/OoTSP.py +++ b/martin_binance/backtest/OoTSP.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc15" +__version__ = "3.0.1rc1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -67,6 +67,7 @@ def main(): print_study_result(study) print(f"Study instance saved to {storage_name} for later use") elif answers.get('mode') == 'Analise saved study session': + # noinspection PyArgumentList study = optuna.load_study(study_name=study_name, storage=storage_name) print(f"Best value: {study.best_value}") diff --git a/martin_binance/backtest/optimizer.py b/martin_binance/backtest/optimizer.py index 089eb3c..8044d57 100755 --- a/martin_binance/backtest/optimizer.py +++ b/martin_binance/backtest/optimizer.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2024 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc4" +__version__ = "3.0.1rc1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -49,8 +49,9 @@ def try_trade(mbs, skip_log, **kwargs): return float(mbs.ex.SESSION_RESULT.get('profit', 0)) + float(mbs.ex.SESSION_RESULT.get('free', 0)) -def optimize(study_name, cli, n_trials, storage_name=None, skip_log=True, show_progress_bar=False): +def optimize(study_name, cli, n_trials, storage_name=None, prm_best=None, skip_log=True, show_progress_bar=False): sys.excepthook = notify_exception + optuna.logging.set_verbosity(optuna.logging.WARNING) def objective(_trial): params = { @@ -70,12 +71,13 @@ def objective(_trial): spec = iu.spec_from_file_location("strategy", cli) mbs = iu.module_from_spec(spec) spec.loader.exec_module(mbs) - optuna.logging.set_verbosity(optuna.logging.WARNING) _study = optuna.create_study(study_name=study_name, storage=storage_name, direction="maximize") - try: - _study.optimize(objective, n_trials=n_trials, gc_after_trial=True, show_progress_bar=show_progress_bar) - except KeyboardInterrupt: - pass # ignore + + if prm_best: + logger.info(f"Previous best params: {prm_best}") + _study.enqueue_trial(prm_best) + + _study.optimize(objective, n_trials=n_trials, gc_after_trial=True, show_progress_bar=show_progress_bar) return _study @@ -90,18 +92,32 @@ async def run_optimize(*args): logger.level = logging.INFO formatter = logging.Formatter(fmt="[%(asctime)s: %(levelname)s] %(message)s") # - fh = logging.handlers.RotatingFileHandler(Path(LOG_PATH, sys.argv[5]), maxBytes=500000, backupCount=5) + fh = logging.handlers.RotatingFileHandler(Path(LOG_PATH, sys.argv[6]), maxBytes=500000, backupCount=5) fh.setFormatter(formatter) fh.setLevel(logging.INFO) logger.addHandler(fh) # - study = optimize(sys.argv[1], sys.argv[2], int(sys.argv[3]), storage_name=sys.argv[4]) - logger.info(f"Optimal parameters: {study.best_params} for get {study.best_value}") - new_value = study.best_value - _value = study.get_trials()[0].value - if new_value > _value: - res = study.best_params - res |= {'new_value': any2str(new_value), '_value': any2str(_value)} - print(json.dumps(res)) + prm_best = None + try: + prm_best = json.loads(sys.argv[5]) + study = optimize( + sys.argv[1], + sys.argv[2], + int(sys.argv[3]), + storage_name=sys.argv[4], + prm_best=prm_best + ) + except KeyboardInterrupt: + pass # ignore + except Exception as ex: + logger.info(f"optimizer: {ex}") else: - print(json.dumps({})) + logger.info(f"Optimal parameters: {study.best_params} for get {study.best_value}") + new_value = study.best_value + _value = study.get_trials()[0].value + if new_value > _value or not prm_best: + res = study.best_params + res |= {'new_value': any2str(new_value), '_value': any2str(_value)} + print(json.dumps(res)) + else: + print(json.dumps({})) diff --git a/martin_binance/cli_0_BTCUSDT.py.template b/martin_binance/cli_0_BTCUSDT.py.template index c530ffe..29f50b9 100644 --- a/martin_binance/cli_0_BTCUSDT.py.template +++ b/martin_binance/cli_0_BTCUSDT.py.template @@ -7,7 +7,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" """ @@ -37,11 +37,10 @@ import martin_binance.params as ex ex.SYMBOL = 'BTCUSDT' # Exchange setup, see list of exchange in ms_cfg.toml ex.ID_EXCHANGE = 0 # See ms_cfg.toml Use for collection of statistics *and get client connection* -ex.FEE_IN_PAIR = True # Fee pays in pair ex.FEE_MAKER = Decimal('0.1') # standard exchange Fee for maker ex.FEE_TAKER = Decimal('0.1') # standard exchange Fee for taker -ex.FEE_SECOND = False # On KRAKEN fee always in second coin -ex.FEE_BNB_IN_PAIR = False # Binance fee in BNB and BNB is base asset +ex.FEE_FIRST = False # For example fee in BNB and BNB in pair, and it is base asset +ex.FEE_SECOND = False # For example fee in BNB and BNB in pair, and it is quote asset ex.GRID_MAX_COUNT = 5 # Maximum counts for placed grid orders # Trade parameter ex.START_ON_BUY = True # First cycle direction @@ -147,8 +146,6 @@ def trade(strategy=None): except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) - if ex.MODE in ('T', 'TC'): - loop.close() return strategy diff --git a/martin_binance/cli_1_BTCUSDT.py.template b/martin_binance/cli_1_BTCUSDT.py.template index b46abe1..c3f82e6 100644 --- a/martin_binance/cli_1_BTCUSDT.py.template +++ b/martin_binance/cli_1_BTCUSDT.py.template @@ -7,7 +7,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" """ @@ -37,12 +37,11 @@ import martin_binance.params as ex ex.SYMBOL = 'BTCUSDT' # Exchange setup, see list of exchange in ms_cfg.toml ex.ID_EXCHANGE = 1 # See ms_cfg.toml Use for collection of statistics *and get client connection* -ex.FEE_IN_PAIR = True # Fee pays in pair ex.FEE_MAKER = Decimal('0.08') # standard exchange Fee for maker ex.FEE_TAKER = Decimal('0.1') # standard exchange Fee for taker -ex.FEE_SECOND = False # On KRAKEN fee always in second coin -ex.FEE_BNB_IN_PAIR = False # Binance fee in BNB and BNB is base asset -ex.GRID_MAX_COUNT = 5 # Maximum counts for placed grid orders +ex.FEE_FIRST = False # For example fee in BNB and BNB in pair, and it is base asset +ex.FEE_SECOND = False # For example fee in BNB and BNB in pair, and it is quote asset +ex.GRID_MAX_COUNT = 3 # Maximum counts for placed grid orders # Trade parameter ex.START_ON_BUY = True # First cycle direction ex.AMOUNT_FIRST = Decimal('0.05') # Deposit for Sale cycle in first currency @@ -147,8 +146,6 @@ def trade(strategy=None): except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) - if ex.MODE in ('T', 'TC'): - loop.close() return strategy diff --git a/martin_binance/cli_2_TESTBTCTESTUSDT.py.template b/martin_binance/cli_2_TESTBTCTESTUSDT.py.template index 414c16a..613addf 100644 --- a/martin_binance/cli_2_TESTBTCTESTUSDT.py.template +++ b/martin_binance/cli_2_TESTBTCTESTUSDT.py.template @@ -7,7 +7,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" """ @@ -37,11 +37,10 @@ import martin_binance.params as ex ex.SYMBOL = 'TESTBTCTESTUSDT' # Exchange setup, see list of exchange in ms_cfg.toml ex.ID_EXCHANGE = 2 # See ms_cfg.toml Use for collection of statistics *and get client connection* -ex.FEE_IN_PAIR = True # Fee pays in pair ex.FEE_MAKER = Decimal('0.1') # standard exchange Fee for maker ex.FEE_TAKER = Decimal('0.17') # standard exchange Fee for taker -ex.FEE_SECOND = False # On KRAKEN fee always in second coin -ex.FEE_BNB_IN_PAIR = False # Binance fee in BNB and BNB is base asset +ex.FEE_FIRST = False # For example fee in BNB and BNB in pair, and it is base asset +ex.FEE_SECOND = False # For example fee in BNB and BNB in pair, and it is quote asset ex.GRID_MAX_COUNT = 5 # Maximum counts for placed grid orders # Trade parameter ex.START_ON_BUY = True # First cycle direction @@ -147,8 +146,6 @@ def trade(strategy=None): except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) - if ex.MODE in ('T', 'TC'): - loop.close() return strategy diff --git a/martin_binance/cli_3_BTCUSDT.py.template b/martin_binance/cli_3_BTCUSDT.py.template index bad778f..938fa66 100755 --- a/martin_binance/cli_3_BTCUSDT.py.template +++ b/martin_binance/cli_3_BTCUSDT.py.template @@ -7,7 +7,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" """ @@ -37,11 +37,10 @@ import martin_binance.params as ex ex.SYMBOL = 'BTCUSDT' # Exchange setup, see list of exchange in ms_cfg.toml ex.ID_EXCHANGE = 3 # See ms_cfg.toml Use for collection of statistics *and get client connection* -ex.FEE_IN_PAIR = True # Fee pays in pair ex.FEE_MAKER = Decimal('0.1') # standard exchange Fee for maker ex.FEE_TAKER = Decimal('0.15') # standard exchange Fee for taker -ex.FEE_SECOND = False # On KRAKEN fee always in second coin -ex.FEE_BNB_IN_PAIR = False # Binance fee in BNB and BNB is base asset +ex.FEE_FIRST = False # For example fee in BNB and BNB in pair, and it is base asset +ex.FEE_SECOND = False # For example fee in BNB and BNB in pair, and it is quote asset ex.GRID_MAX_COUNT = 5 # Maximum counts for placed grid orders # Trade parameter ex.START_ON_BUY = True # First cycle direction @@ -70,7 +69,7 @@ ex.COLLECT_ASSETS = False # Transfer free asset to main account, valid for suba ex.ADAPTIVE_TRADE_CONDITION = True ex.BB_CANDLE_SIZE_IN_MINUTES = 60 ex.BB_NUMBER_OF_CANDLES = 20 -ex.KBB = 1.0 # k for Bollinger Band +ex.KBB = 2.0 # k for Bollinger Band # Parameter for calculate price of grid orders by logarithmic scale # If -1 function is disabled, can take a value from 0 to infinity (in practice no more 1000) # When 0 - logarithmic scale, increase parameter the result is approaching linear @@ -147,8 +146,6 @@ def trade(strategy=None): except Exception as _err: print(f"Error: {_err}") loop.run_until_complete(loop.shutdown_asyncgens()) - if ex.MODE in ('T', 'TC'): - loop.close() return strategy diff --git a/martin_binance/client.py b/martin_binance/client.py index f16e162..b4890a4 100644 --- a/martin_binance/client.py +++ b/martin_binance/client.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc7" +__version__ = "3.0.0rc22" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -14,6 +14,7 @@ import grpclib.exceptions import shortuuid +import traceback from exchanges_wrapper import martin as mr, Channel, Status, GRPCError @@ -90,24 +91,20 @@ async def send_request(self, _request, _request_type, **kwargs): except asyncio.CancelledError: pass # Task cancellation should not be logged as an error except grpclib.exceptions.StreamTerminatedError: - logger.warning("Have not connection to gRPC server") + raise UserWarning("Have not connection to gRPC server") + except ConnectionRefusedError as ex: + raise UserWarning("Connection to gRPC server broken") except GRPCError as ex: status_code = ex.status - if ( - (status_code == Status.UNAVAILABLE - and 'failed to connect to all addresses' in ex.message) - or - (status_code == Status.UNKNOWN - and "No client exist" in ex.message) - ): + if status_code == Status.UNAVAILABLE: self.client = None raise UserWarning( "Connection to gRPC server failed, wait connection..." ) from ex - if status_code == Status.RESOURCE_EXHAUSTED: - raise - logger.debug(f"Exception on send request {_request}: {status_code.name}, {ex.message}") + logger.debug(f"Send request {_request}: {status_code.name}, {ex.message}") raise + except Exception as ex: + logger.error(f"Exception on send request {ex}") else: if res is None: self.client = None diff --git a/martin_binance/executor.py b/martin_binance/executor.py index 2d13bc5..cee6eb6 100755 --- a/martin_binance/executor.py +++ b/martin_binance/executor.py @@ -7,8 +7,6 @@ __version__ = "3.0.0rc19" __maintainer__ = "Jerry Fedorenko" __contact__ = 'https://github.com/DogsTailFarmer' - -import logging ################################################################## import sys import gc @@ -21,7 +19,7 @@ import sqlite3 import ujson as json -from datetime import datetime +from datetime import datetime, timezone import os import psutil import numpy as np @@ -40,8 +38,9 @@ class Strategy(StrategyBase): - def __init__(self): - super().__init__() + def __init__(self, call_super=True): + if call_super: + super().__init__() if LOGGING: print(f"Init Strategy, ver: {HEAD_VERSION} + {__version__} + {msb_ver}") self.cycle_buy = not START_ON_BUY if REVERSE else START_ON_BUY # + Direction (Buy/Sell) for current cycle @@ -121,7 +120,7 @@ def __init__(self): self.ts_grid_update = self.get_time() # - When updated grid self.wait_wss_refresh = {} # - - def init(self, check_funds: bool = True) -> None: # skipcq: PYL-W0221 + def init(self, check_funds=True) -> None: # skipcq: PYL-W0221 self.message_log('Start Init section') if COLLECT_ASSETS and GRID_ONLY: init_params_error = 'COLLECT_ASSETS and GRID_ONLY: one only allowed' @@ -272,10 +271,10 @@ def save_strategy_state(self, return_only=False) -> Dict[str, str]: last_price = self.get_buffered_ticker().last_price ticker_update = int(self.get_time()) - self.last_ticker_update if self.cycle_time: - ct = str(datetime.utcnow() - self.cycle_time).rsplit('.')[0] + ct = str(datetime.now(timezone.utc) - self.cycle_time).rsplit('.')[0] else: self.message_log("save_strategy_state: cycle_time is None!", log_level=logging.DEBUG) - ct = str(datetime.utcnow()).rsplit('.')[0] + ct = str(datetime.now(timezone.utc)).rsplit('.')[0] if self.command == 'stopped': self.message_log("Strategy stopped. Need manual action", tlg=True) elif self.grid_hold or self.tp_order_hold: @@ -580,7 +579,7 @@ def start(self, profit_f: Decimal = O_DEC, profit_s: Decimal = O_DEC) -> None: else: df = self.deposit_first - self.profit_first ds = O_DEC - ct = datetime.utcnow() - self.cycle_time + ct = datetime.now(timezone.utc) - self.cycle_time ct = ct.total_seconds() # noinspection PyUnboundLocalVariable data_to_db = { @@ -642,7 +641,7 @@ def start(self, profit_f: Decimal = O_DEC, profit_s: Decimal = O_DEC) -> None: f"Second: {self.sum_profit_second}\n" f"Summary: {self.sum_profit_first * self.avg_rate + self.sum_profit_second:f}\n") if self.first_run or MODE in ('T', 'TC'): - self.cycle_time = datetime.utcnow() + self.cycle_time = datetime.now(timezone.utc) # memory = psutil.virtual_memory() swap = psutil.swap_memory() @@ -910,12 +909,12 @@ def collect_assets(self) -> (): ff, fs, _, _ = self.get_free_assets(mode='free') tcm = self.get_trading_capability_manager() if ff >= f2d(tcm.min_qty): - self.message_log(f"Sending {ff} {self.f_currency} to main account", color=Style.UNDERLINE, tlg=True) + self.message_log(f"Sending {ff} {self.f_currency} to main account", color=Style.UNDERLINE) self.transfer_to_master(self.f_currency, any2str(ff)) else: ff = O_DEC if fs >= f2d(tcm.min_notional): - self.message_log(f"Sending {fs} {self.s_currency} to main account", color=Style.UNDERLINE, tlg=True) + self.message_log(f"Sending {fs} {self.s_currency} to main account", color=Style.UNDERLINE) self.transfer_to_master(self.s_currency, any2str(fs)) else: fs = O_DEC @@ -1630,7 +1629,7 @@ def after_filled_tp(self, one_else_grid: bool = False): self.tp_was_filled = () if self.convert_tp(amount_first_fee - profit_first, amount_second_fee - profit_second): return - self.message_log("Transfer filled TP amount to the next cycle", tlg=True) + self.message_log("Transfer filled TP amount to the next cycle") transfer_sum_amount_first = self.sum_amount_first transfer_sum_amount_second = self.sum_amount_second if self.cycle_buy: @@ -1697,7 +1696,7 @@ def reverse_after_grid_ending(self): self.reverse = False self.restart = True # Calculate profit and time for Reverse cycle - self.cycle_time = self.cycle_time_reverse or datetime.utcnow() + self.cycle_time = self.cycle_time_reverse or datetime.now(timezone.utc) if self.cycle_buy: self.profit_first += self.round_truncate(self.sum_amount_first - self.reverse_init_amount + self.tp_part_amount_first, base=True) @@ -1734,7 +1733,7 @@ def reverse_after_grid_ending(self): trend_up = adx.get('adx') > ADX_THRESHOLD and adx.get('+DI') > adx.get('-DI') trend_down = adx.get('adx') > ADX_THRESHOLD and adx.get('-DI') > adx.get('+DI') # print('adx: {}, +DI: {}, -DI: {}'.format(adx.get('adx'), adx.get('+DI'), adx.get('-DI'))) - self.cycle_time_reverse = self.cycle_time or datetime.utcnow() + self.cycle_time_reverse = self.cycle_time or datetime.now(timezone.utc) self.start_reverse_time = self.get_time() # Calculate target return amount tp = self.calc_profit_order(not self.cycle_buy) @@ -1939,10 +1938,9 @@ def convert_tp( _rounding=ROUND_CEILING ) self.message_log(f"For additional {'Buy' if self.cycle_buy else 'Sell'}" - f"{' Reverse' if self.reverse else ''} grid amount: {amount}", - tlg=True) + f"{' Reverse' if self.reverse else ''} grid amount: {amount}") if self.check_min_amount(amount=_amount_f): - self.message_log("Place additional grid orders", tlg=True) + self.message_log("Place additional grid orders") self.restore_orders_fire() if replace_tp: self.tp_hold_additional = True @@ -1954,7 +1952,7 @@ def convert_tp( additional_grid=True) return True if self.orders_hold: - self.message_log("Small amount was added to last held grid order", tlg=True) + self.message_log("Small amount was added to last held grid order") self.restore_orders_fire() _order = list(self.orders_hold.get_last()) _order[2] += (amount / _order[3]) if self.cycle_buy else amount @@ -1963,7 +1961,7 @@ def convert_tp( return True if self.orders_grid: - self.message_log("Small amount was added to last grid order", tlg=True) + self.message_log("Small amount was added to last grid order") self.restore_orders_fire() _order = list(self.orders_grid.get_last()) _order_updated = self.get_buffered_open_order(_order[0]) @@ -2333,7 +2331,7 @@ def on_order_update_ex(self, update: OrderUpdate) -> None: self.grid_remove = None self.cancel_grid(cancel_all=True) else: - self.message_log('Wild order, do not know it', tlg=True) + self.message_log(f"Wild order, do not know it: {update.original_order.id}", tlg=True) elif update.status == OrderUpdate.PARTIALLY_FILLED: if self.orders_grid.exist(update.original_order.id): self.message_log("Grid order partially filled", color=Style.B_WHITE) @@ -2410,7 +2408,7 @@ def on_order_update_ex(self, update: OrderUpdate) -> None: self.cancel_reverse_hold() self.message_log("Part filled TP was converted to grid", tlg=True) else: - self.message_log('Wild order, do not know it', tlg=True) + self.message_log(f"Wild order, do not know it: {update.original_order.id}", tlg=True) def cancel_reverse_hold(self): self.reverse_hold = False @@ -2602,3 +2600,6 @@ def restore_state_before_backtesting_ex(self, saved_state): orders, sum_amount=(self.cycle_buy, self.sum_amount_first, self.sum_amount_second) ) + + def reset_vars_ex(self): + self.__init__(call_super=False) diff --git a/martin_binance/strategy_base.py b/martin_binance/strategy_base.py index 76a4fcb..d31ca44 100644 --- a/martin_binance/strategy_base.py +++ b/martin_binance/strategy_base.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc21" +__version__ = "3.0.0rc24" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -20,7 +20,7 @@ import time import traceback from abc import abstractmethod -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from decimal import Decimal from pathlib import Path from shutil import rmtree, copy @@ -62,6 +62,7 @@ TICKER_PRKT = "ticker.parquet" MS_ORDER_ID = 'ms.order_id' MS_ORDERS = 'ms.orders' +CONTROLLED_ASSETS = ['BNB'] # which is not traded, but must be controlled O_DEC = Decimal() SAVE_TRADE_QUEUE = asyncio.Queue() @@ -116,12 +117,13 @@ def __init__(self): self.tasks_list = [] # self.time_operational = {'ts': 0.0, 'diff': 0.0, 'new': 0.0} # - See get_time() - self.account = backTestAccount(prm.SAVE_DS) if prm.MODE == 'S' else None + self.account = None self.get_buffered_funds_last_time = self.get_time() self.queue_to_tlg = queue.Queue() if prm.TOKEN and prm.MODE != 'S' else None self.status_time = None # + Last time sending status message self.tlg_header = '' # - Header for Telegram message self.start_collect = None + self.s_mode_break = None # Init in reset_backtest_vars() self.s_ticker = None self.s_order_book = None @@ -154,6 +156,7 @@ def reset_backtest_vars(self): self.grid_sell = {} def reset_vars(self): + self.account = None self.ticker = {} self.funds = {} self.order_book = {} @@ -167,6 +170,7 @@ def reset_vars(self): self.start_time_ms = int(time.time() * 1000) self.backtest = {} self.bulk_orders_cancel = {} + self.time_operational = {'ts': 0.0, 'diff': 0.0, 'new': 0.0} def update_vars(self, _session): self.client = _session.client @@ -289,8 +293,7 @@ def order_exist(self, _id) -> bool: def trade_not_exist(self, _order_id: int, _trade_id: int) -> bool: return all( - trade.order_id != _order_id or trade.id != _trade_id - for trade in self.trades + trade.order_id != _order_id or trade.id != _trade_id for trade in self.trades ) def order_trades_sum(self, _order_id: int) -> Decimal: @@ -311,18 +314,20 @@ async def backtest_control(self): """ Managing backtest and optimization cycles """ + while not self.operational_status: + await asyncio.sleep(HEARTBEAT) delay = HEARTBEAT * 30 # 1 min ts = time.time() restart = False - while not self.operational_status: - await asyncio.sleep(HEARTBEAT) + prm_best = {} + while self.operational_status: if self.start_collect and time.time() - ts > prm.SAVE_PERIOD: self.start_collect = False self.session_data_handler() self.reset_backtest_vars() if prm.SELF_OPTIMIZATION and self.command != 'stopped': - _ts = datetime.utcnow() + _ts = datetime.now(timezone.utc) storage_name = Path(self.session_root, "_study.db") try: _res = await run_optimize( @@ -331,30 +336,35 @@ async def backtest_control(self): Path(self.session_root, Path(prm.PARAMS).name), str(prm.N_TRIALS), f"sqlite:///{storage_name}", - f"{prm.ID_EXCHANGE}_{prm.SYMBOL}_S.log" + json.dumps(prm_best), + f"{prm.ID_EXCHANGE}_{prm.SYMBOL}_S.log", ) - _res = orjson.loads(_res) + prm_best = orjson.loads(_res) except (asyncio.CancelledError, KeyboardInterrupt): break except Exception as err: - self.message_log(f"Backtest control: {err}", log_level=logging.WARNING) + self.message_log(f"Backtest control: {err}", log_level=logging.ERROR) + self.message_log(f"Exception traceback: {traceback.format_exc()}", log_level=logging.DEBUG) else: storage_name.replace(storage_name.with_name('study.db')) - if _res: - self.message_log(f"Updating parameters from backtest," - f" predicted value {_res.pop('_value')} ->" - f" {_res.pop('new_value')}", - color=Style.B_WHITE, tlg=True) - for key, value in _res.items(): + if prm_best: + self.message_log( + f"Updating parameters from backtest," + f" predicted value {prm_best.pop('_value')} -> {prm_best.pop('new_value')}", + color=Style.B_WHITE, + tlg=True + ) + for key, value in prm_best.items(): self.message_log(f"{key}: {getattr(prm, key)} -> {value}") setattr( prm, key, value if isinstance(value, int) or key in PARAMS_FLOAT else Decimal(f"{value}") ) + l_m = str(datetime.now(timezone.utc) - _ts + timedelta(seconds=prm.SAVE_PERIOD)).rsplit('.')[0] self.message_log( - f"Strategy parameters are optimal now. Optimization cycle duration" - f" {str(datetime.utcnow() - _ts + timedelta(seconds=prm.SAVE_PERIOD)).rsplit('.')[0]}", - color=Style.B_WHITE, tlg=True + f"Strategy parameters are optimal now. Optimization cycle duration {l_m}", + color=Style.B_WHITE, + tlg=True ) restart = True else: @@ -392,6 +402,7 @@ def session_data_handler(self): """ # Finalize ticker file if _ticker := self.s_ticker['pylist']: + # noinspection PyArgumentList self.s_ticker['writer'].write_batch( pa.RecordBatch.from_pylist(mapping=_ticker) ) @@ -400,6 +411,7 @@ def session_data_handler(self): # Finalize order_book file if _order_book := self.s_order_book['pylist']: + # noinspection PyArgumentList self.s_order_book['writer'].write_batch( pa.RecordBatch.from_pylist(mapping=_order_book) ) @@ -414,6 +426,7 @@ def session_data_handler(self): # Finalize candles files for i in KLINES_INIT: if _candles := self.candles[f"pylist_{i.value}"]: + # noinspection PyArgumentList self.candles[f"writer_{i.value}"].write_batch( pa.RecordBatch.from_pylist(mapping=_candles) ) @@ -454,7 +467,7 @@ def back_test_handler(self): s_free = prm.SESSION_RESULT['free'] = f"{self.get_free_assets(mode='free', backtest=True)[2]}" if prm.LOGGING: print(f"Session profit: {s_profit}, free: {s_free}, total: {float(s_profit) + float(s_free)}") - test_time = datetime.utcnow() - self.cycle_time + test_time = datetime.now(timezone.utc) - self.cycle_time original_time = (self.backtest['ticker_index_last'] - self.backtest['ticker_index_first']) / 1000 original_time = timedelta(seconds=original_time) print(f"Original time: {original_time}, test time: {test_time}, x = {original_time / test_time:.2f}") @@ -514,8 +527,11 @@ async def heartbeat(self, _session): mr.MarketRequest, symbol=self.symbol ) + except UserWarning as ex: + self.wss_fire_up = True + self.message_log(f"{ex}", log_level=logging.WARNING) except Exception as ex: - self.message_log(f"Exception on check WSS: {ex}", log_level=logging.WARNING) + self.message_log(f"Exception on check WSS: {ex}", log_level=logging.ERROR) else: if not res.success: self.message_log(f"Not active WSS for {self.symbol} on {self.exchange}," @@ -523,8 +539,6 @@ async def heartbeat(self, _session): update_max_queue_size = True self.wss_fire_up = True # - - print(f"heartbeat: client_id: {self.client_id}, wss_fire_up: {self.wss_fire_up}") if self.client_id and self.wss_fire_up: try: if await self.session.get_client(): @@ -552,8 +566,8 @@ async def save_asset(self): while connection_analytic is None: connection_analytic = self.connection_analytic await asyncio.sleep(HEARTBEAT) - delay = 300 # 5 min - max_use_update = 12.5 * 60 # 12.5 min if the row has not been updated that the instance is down + delay = 600 # 10 min + max_use_update = 25 * 60 # 25 min if the row has not been updated that the instance is down while self.operational_status: try: res = await self.send_request(self.stub.fetch_account_information, mr.OpenClientConnectionId) @@ -593,12 +607,13 @@ async def save_asset(self): assets_fw[fw['asset']] = Decimal(fw['free']) + Decimal(fw['locked']) + Decimal(fw['freeze']) # Create list of cumulative asset from SPOT and Funding wallet assets = {} + controlled_assets = [self.base_asset, self.quote_asset] + CONTROLLED_ASSETS for balance in balances: if self.exchange != 'bitfinex': total = assets_fw.pop(balance['asset'], O_DEC) else: total = Decimal('0.0') - if balance['asset'] in (self.base_asset, self.quote_asset) or prm.GRID_ONLY: + if balance['asset'] in controlled_assets or prm.GRID_ONLY: total += Decimal(balance['free']) + Decimal(balance['locked']) assets[balance['asset']] = float(total) @@ -616,7 +631,7 @@ async def save_asset(self): for key, value in assets.items(): if prm.GRID_ONLY: refresh_t_asset(cursor, key, value, used=0) - elif key in (self.base_asset, self.quote_asset): + elif key in controlled_assets: refresh_t_asset(cursor, key, value, used=1) cursor.execute('COMMIT') @@ -630,8 +645,9 @@ async def save_asset(self): async def ask_exit(self): self.message_log("Got signal for exit", color=Style.MAGENTA) self.operational_status = False + self.s_mode_break = True + await asyncio.sleep(HEARTBEAT) if prm.MODE in ('T', 'TC'): - await asyncio.sleep(HEARTBEAT) try: await self.send_request(self.stub.stop_stream, mr.MarketRequest, symbol=self.symbol) except Exception as ex: @@ -644,20 +660,21 @@ async def ask_exit(self): self.start_collect = False self.session_data_handler() - self.channel.close() - tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - [task.cancel() for task in tasks] - await asyncio.gather(*tasks, return_exceptions=True) - if prm.LOGGING: - print(f"Cancelling {len(tasks)} outstanding tasks") + self.channel.close() + + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + [task.cancel() for task in tasks] + await asyncio.gather(*tasks, return_exceptions=True) + if prm.LOGGING: + print(f"Cancelling {len(tasks)} outstanding tasks") + if prm.LAST_STATE_FILE.exists(): + print(f"Current state saved into {prm.LAST_STATE_FILE}") + try: self.stop() except Exception as _err: print(f"ask_exit.strategy.stop: {_err}") - if prm.MODE in ('T', 'TC') and prm.LAST_STATE_FILE.exists(): - print(f"Current state saved into {prm.LAST_STATE_FILE}") - async def fetch_order(self, _id: int, _client_order_id: str = None, _filled_update_call=False): try: res = await self.send_request( @@ -735,8 +752,11 @@ async def loop_ds(self, ds, ticker=False): self.backtest['ticker_index_last'] = index_prev * 1000 async def aiter_candles(self, _klines: {str: Klines}, _i: str): + self.s_mode_break = None async for row in self.loop_ds(self.backtest[f"candles_{_i}"]): _klines.get(_i).refresh(row) + if self.s_mode_break: + break self.message_log(f"Backtest candles *** {_i} *** timeSeries ended") def open_orders_snapshot(self, ts=None): @@ -784,9 +804,12 @@ async def cancel_order_call(self, _id: int, cancel_all=False, count=0): except GRPCError as ex: _fetch_order = True self.message_log(f"Exception on cancel order {_id}: {ex.status.name}, {ex.message}") - except Exception as _ex: + except UserWarning as ex: + _fetch_order = True + self.message_log(f"Exception on cancel order call for {_id}: {ex}", log_level=logging.WARNING) + except Exception as ex: _fetch_order = True - self.message_log(f"Exception on cancel order call for {_id}: {_ex}", log_level=logging.WARNING) + self.message_log(f"Exception on cancel order call for {_id}: {ex}", log_level=logging.ERROR) self.message_log(f"Exception traceback: {traceback.format_exc()}", log_level=logging.DEBUG) else: # print(f"cancel_order_call.result: {result}") @@ -957,6 +980,7 @@ async def on_klines_update(self, _klines: {str: Klines}): _klines.get(res.interval).refresh(candle) if prm.MODE == 'TC' and (self.start_collect or self.start_collect is None): if len(self.candles[f"pylist_{res.interval}"]) > PYARROW_BATCH_BUFFER_SIZE: + # noinspection PyArgumentList self.candles[f"writer_{res.interval}"].write_batch( pa.RecordBatch.from_pylist(mapping=self.candles[f"pylist_{res.interval}"]) ) @@ -1184,6 +1208,7 @@ async def on_ticker_update(self): if prm.MODE == 'TC' and self.start_collect: ts = int(time.time() * 1000) if len(self.s_ticker['pylist']) > PYARROW_BATCH_BUFFER_SIZE: + # noinspection PyArgumentList self.s_ticker['writer'].write_batch( pa.RecordBatch.from_pylist(mapping=self.s_ticker['pylist']) ) @@ -1200,6 +1225,7 @@ async def on_ticker_update(self): else: if prm.LOGGING: pbar = tqdm(total=self.backtest['ticker'].metadata.num_rows) + self.s_mode_break = None async for row in self.loop_ds(self.backtest['ticker'], ticker=True): self.delay_ordering_s = row.pop('delay', 0) self.ticker = row @@ -1211,9 +1237,12 @@ async def on_ticker_update(self): if prm.LOGGING: # noinspection PyUnboundLocalVariable pbar.update() + if self.s_mode_break: + break if prm.LOGGING: pbar.close() self.message_log("Backtest *** ticker *** timeSeries ended") + self.s_mode_break = True self.back_test_handler() async def on_order_book_update(self): @@ -1230,6 +1259,7 @@ async def on_order_book_update(self): self.order_book['bids'] = self.order_book['bids'][:1] self.order_book['asks'] = self.order_book['asks'][:1] if len(self.s_order_book['pylist']) > PYARROW_BATCH_BUFFER_SIZE: + # noinspection PyArgumentList self.s_order_book['writer'].write_batch( pa.RecordBatch.from_pylist(mapping=self.s_order_book['pylist']) ) @@ -1242,9 +1272,12 @@ async def on_order_book_update(self): self.message_log(f"Exception traceback: {traceback.format_exc()}", log_level=logging.DEBUG) self.wss_fire_up = True else: + self.s_mode_break = None async for row in self.loop_ds(self.backtest['order_book']): self.order_book = row self.on_new_order_book(OrderBook(row)) + if self.s_mode_break: + break self.message_log("Backtest *** order_book *** timeSeries ended") async def buffered_orders(self): @@ -1301,13 +1334,13 @@ async def buffered_orders(self): # print("buffered_orders.Cancelled") self.operational_status = False except UserWarning as ex_2: - self.message_log(f"Exception buffered_orders: {ex_2}", log_level=logging.WARNING) + self.message_log(f"Exception buffered_orders 2: {ex_2}", log_level=logging.WARNING) restore = True except ConnectionRefusedError: restore = True except GRPCError as ex_3: status_code = ex_3.status - self.message_log(f"Exception buffered_orders: {status_code.name}, {ex_3.message}", + self.message_log(f"Exception buffered_orders 3: {status_code.name}, {ex_3.message}", log_level=logging.WARNING, color=Style.B_RED, tlg=True) if status_code == Status.RESOURCE_EXHAUSTED: # Decrease requests frequency @@ -1318,11 +1351,12 @@ async def buffered_orders(self): await self.send_request(self.stub.reset_rate_limit, mr.OpenClientConnectionId, rate_limiter=self.rate_limiter) except Exception as ex_4: - self.message_log(f"Exception buffered_orders:ResetRateLimit: {ex_4}", log_level=logging.WARNING) + self.message_log(f"Exception buffered_orders 4:ResetRateLimit: {ex_4}", + log_level=logging.WARNING) else: restore = True except Exception as ex_5: - self.message_log(f"Exception buffered_orders: {ex_5}", log_level=logging.ERROR) + self.message_log(f"Exception buffered_orders 5: {ex_5}", log_level=logging.ERROR) self.message_log(traceback.format_exc(), log_level=logging.DEBUG) restore = True await asyncio.sleep(self.rate_limiter) @@ -1506,8 +1540,10 @@ async def main(self, _symbol): # Init class atr for reuse in next backtest cycle raw_path = Path(self.session_root, "raw") self.reset_vars() + self.reset_vars_ex() # if prm.MODE == 'S': + self.account = backTestAccount(prm.SAVE_DS) self.account.funds.base = { 'asset': self.base_asset, 'free': prm.AMOUNT_FIRST, @@ -1590,16 +1626,16 @@ async def main(self, _symbol): self.time_operational['new'] = self.backtest['ticker_index_first'] / 1000 self.get_buffered_funds_last_time = self.get_time() self.start_time_ms = int(self.get_time() * 1000) - self.cycle_time = datetime.utcnow() + self.cycle_time = datetime.now(timezone.utc) # await self.wss_declare() if self.state_file.exists(): self.restore_state_before_backtesting() self.init(check_funds=False) - self.start_collect = True else: self.init() self.start() + self.start_collect = True if prm.MODE in ('T', 'TC'): await self.wss_init() @@ -1686,6 +1722,10 @@ def start(self, *args): def init(self, *args, **kwargs): raise NotImplementedError + @abstractmethod + def reset_vars_ex(self): + raise NotImplementedError + # endregion diff --git a/pyproject.toml b/pyproject.toml index 89ed335..50345b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic = ["version", "description"] requires-python = ">=3.9" dependencies = [ - "exchanges-wrapper==2.0.1", + "exchanges-wrapper==2.1.2", "jsonpickle==3.0.2", "psutil==5.9.6", "requests==2.31.0", diff --git a/requirements.txt b/requirements.txt index 423ed3f..16c0a1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -exchanges-wrapper==2.0.1 +exchanges-wrapper==2.1.2 jsonpickle==3.0.2 psutil==5.9.6 requests==2.31.0