diff --git a/CHANGELOG.md b/CHANGELOG.md index 8917a35..a0ddb8e 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## 3.0.1 - 2024-03-31 +### Refined and added new features +* Project architecture +* :rocket: Cyclic Backtesting workflow with update base trade parameters +* :rocket: `Backtesting`: handling of partially filling events +* :rocket: Migrate `gRPC` from [grpcio](https://grpc.io/) to [grpclib](https://github.com/vmagamedov/grpclib) + [python-betterproto](https://github.com/danielgtaylor/python-betterproto) +* Logging improvement +* `Analytics`: Refine unload and processing assets data +* Refactoring processing periodically events based on [schedule](https://github.com/dbader/schedule) +* New strategy mode: [Keeping level of first asset](https://github.com/DogsTailFarmer/martin-binance/wiki/How-it's-work#keeping-level-of-first-asset) + +### Update +* Up requirements for exchanges-wrapper==2.1.6 + ## 3.0.1rc7 - 2024-03-26 ### Update * Refactoring processing periodically events diff --git a/martin_binance/__init__.py b/martin_binance/__init__.py index a128487..e5e44e9 100755 --- a/martin_binance/__init__.py +++ b/martin_binance/__init__.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc7" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/backtest/OoTSP.py b/martin_binance/backtest/OoTSP.py index 6fd75ee..3a580fa 100644 --- a/martin_binance/backtest/OoTSP.py +++ b/martin_binance/backtest/OoTSP.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc3" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/backtest/exchange_simulator.py b/martin_binance/backtest/exchange_simulator.py index 8c3549a..e6f4bfc 100644 --- a/martin_binance/backtest/exchange_simulator.py +++ b/martin_binance/backtest/exchange_simulator.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc5" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/backtest/optimizer.py b/martin_binance/backtest/optimizer.py index c3dd98b..c401bae 100755 --- a/martin_binance/backtest/optimizer.py +++ b/martin_binance/backtest/optimizer.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2024 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc3" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -54,24 +54,25 @@ def optimize(study_name, cli, n_trials, storage_name=None, _prm_best=None, skip_ sys.excepthook = notify_exception optuna.logging.set_verbosity(optuna.logging.WARNING) + spec = iu.spec_from_file_location("strategy", cli) + mbs = iu.module_from_spec(spec) + spec.loader.exec_module(mbs) + def objective(_trial): params = { 'GRID_MAX_COUNT': _trial.suggest_int('GRID_MAX_COUNT', 3, 5), 'PRICE_SHIFT': _trial.suggest_float('PRICE_SHIFT', 0, 0.05, step=0.01), 'PROFIT': _trial.suggest_float('PROFIT', 0.05, 0.2, step=0.05), - 'PROFIT_MAX': _trial.suggest_float('PROFIT_MAX', 0.35, 1.0, step=0.05), + 'PROFIT_MAX': _trial.suggest_float('PROFIT_MAX', 0.4, 1.0, step=0.05), 'OVER_PRICE': _trial.suggest_float('OVER_PRICE', 0.1, 1, step=0.1), 'ORDER_Q': _trial.suggest_int('ORDER_Q', 6, 12), 'MARTIN': _trial.suggest_float('MARTIN', 5, 15, step=1), 'SHIFT_GRID_DELAY': _trial.suggest_int('SHIFT_GRID_DELAY', 10, 150, step=10), - 'KBB': _trial.suggest_float('KBB', 1, 5, step=0.5), - 'LINEAR_GRID_K': _trial.suggest_int('LINEAR_GRID_K', 0, 100, step=20), + 'KBB': _trial.suggest_float('KBB', 0.5, 4, step=0.5), + 'LINEAR_GRID_K': _trial.suggest_int('LINEAR_GRID_K', 0, 500, step=50), } return try_trade(mbs, skip_log, **params) - spec = iu.spec_from_file_location("strategy", cli) - mbs = iu.module_from_spec(spec) - spec.loader.exec_module(mbs) # noinspection PyArgumentList _study = optuna.create_study(study_name=study_name, storage=storage_name, direction="maximize") @@ -99,9 +100,8 @@ async def run_optimize(*args): fh.setLevel(logging.INFO) logger.addHandler(fh) # - prm_best = None + prm_best = json.loads(sys.argv[5]) try: - prm_best = json.loads(sys.argv[5]) study = optimize( sys.argv[1], sys.argv[2], @@ -114,10 +114,10 @@ async def run_optimize(*args): except Exception as ex: logger.info(f"optimizer: {ex}") else: - logger.info(f"Optimal parameters: {study.best_params} for get {study.best_value}") - new_value = study.best_value - _value = study.get_trials()[0].value - if new_value > _value or not prm_best: + new_value = round(study.best_value, ndigits=6) + logger.info(f"Optimal parameters: {study.best_params} for get {new_value}") + _value = round(study.get_trials()[0].value, ndigits=6) + if not prm_best or new_value > _value: res = study.best_params res |= {'new_value': any2str(new_value), '_value': any2str(_value)} print(json.dumps(res)) diff --git a/martin_binance/cli_0_BTCUSDT.py.template b/martin_binance/cli_0_BTCUSDT.py.template index 29f50b9..418d74b 100644 --- a/martin_binance/cli_0_BTCUSDT.py.template +++ b/martin_binance/cli_0_BTCUSDT.py.template @@ -141,7 +141,7 @@ def trade(strategy=None): finally: try: loop.run_until_complete(strategy.ask_exit()) - except asyncio.CancelledError: + except (asyncio.CancelledError, KeyboardInterrupt): pass except Exception as _err: print(f"Error: {_err}") diff --git a/martin_binance/cli_1_BTCUSDT.py.template b/martin_binance/cli_1_BTCUSDT.py.template index c3f82e6..5e196da 100644 --- a/martin_binance/cli_1_BTCUSDT.py.template +++ b/martin_binance/cli_1_BTCUSDT.py.template @@ -141,7 +141,7 @@ def trade(strategy=None): finally: try: loop.run_until_complete(strategy.ask_exit()) - except asyncio.CancelledError: + except (asyncio.CancelledError, KeyboardInterrupt): pass except Exception as _err: print(f"Error: {_err}") diff --git a/martin_binance/cli_2_TESTBTCTESTUSDT.py.template b/martin_binance/cli_2_TESTBTCTESTUSDT.py.template index 613addf..a7e02bd 100644 --- a/martin_binance/cli_2_TESTBTCTESTUSDT.py.template +++ b/martin_binance/cli_2_TESTBTCTESTUSDT.py.template @@ -141,7 +141,7 @@ def trade(strategy=None): finally: try: loop.run_until_complete(strategy.ask_exit()) - except asyncio.CancelledError: + except (asyncio.CancelledError, KeyboardInterrupt): pass except Exception as _err: print(f"Error: {_err}") diff --git a/martin_binance/cli_3_BTCUSDT.py.template b/martin_binance/cli_3_BTCUSDT.py.template index 938fa66..6f6954c 100755 --- a/martin_binance/cli_3_BTCUSDT.py.template +++ b/martin_binance/cli_3_BTCUSDT.py.template @@ -141,7 +141,7 @@ def trade(strategy=None): finally: try: loop.run_until_complete(strategy.ask_exit()) - except asyncio.CancelledError: + except (asyncio.CancelledError, KeyboardInterrupt): pass except Exception as _err: print(f"Error: {_err}") diff --git a/martin_binance/client.py b/martin_binance/client.py index 030d92a..0febb83 100644 --- a/martin_binance/client.py +++ b/martin_binance/client.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc7" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -16,7 +16,6 @@ import grpclib.exceptions import shortuuid -from martin_binance import ORDER_TIMEOUT from exchanges_wrapper import martin as mr, Channel, Status, GRPCError logger = logging.getLogger('logger.client') @@ -86,30 +85,23 @@ async def send_request(self, _request, _request_type, **kwargs): kwargs['client_id'] = self.client.client_id kwargs['trade_id'] = self.trade_id try: - res = await asyncio.wait_for(_request(_request_type(**kwargs)), ORDER_TIMEOUT) + res = await _request(_request_type(**kwargs)) except asyncio.CancelledError: pass # Task cancellation should not be logged as an error except grpclib.exceptions.StreamTerminatedError: raise UserWarning("Have not connection to gRPC server") except ConnectionRefusedError: raise UserWarning("Connection to gRPC server broken") - except asyncio.TimeoutError: - self.channel.close() - raise UserWarning("gRCP request timeout error") except GRPCError as ex: status_code = ex.status + logger.debug(f"Send request {_request}: {status_code.name}, {ex.message}") if status_code == Status.UNAVAILABLE: self.client = None raise UserWarning("Wait connection to gRPC server") from None - logger.debug(f"Send request {_request}: {status_code.name}, {ex.message}") raise except Exception as ex: logger.error(f"Exception on send request {ex}") else: - if res is None: - self.client = None - asyncio.create_task(self.get_client()) - raise UserWarning("Can't get response, restart connection to gRPC server ...") return res async def for_request(self, _request, _request_type, **kwargs): @@ -123,8 +115,10 @@ async def for_request(self, _request, _request_type, **kwargs): except asyncio.CancelledError: pass # Task cancellation should not be logged as an error except grpclib.exceptions.StreamTerminatedError: - logger.warning("WSS connection to gRPC server was terminated") + pass # handling in send_request() except GRPCError as ex: status_code = ex.status logger.warning(f"Exception on WSS loop: {status_code.name}, {ex.message}") raise + except Exception as ex: + logger.debug(f"for_request: {ex}") diff --git a/martin_binance/executor.py b/martin_binance/executor.py index 230b900..65dd11f 100755 --- a/martin_binance/executor.py +++ b/martin_binance/executor.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc7" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = 'https://github.com/DogsTailFarmer' ################################################################## @@ -59,13 +59,16 @@ def __init__(self, call_super=True): self.tp_hold_additional = False # - Need place TP after placed additional grid orders self.tp_target = O_DEC # + Target amount for TP that will be placed self.tp_amount = O_DEC # + Initial depo for active TP + self.tp_part_amount_first = O_DEC # + Sum partially filled TP + self.tp_part_amount_second = O_DEC # + Sum partially filled TP self.part_profit_first = O_DEC # + self.part_profit_second = O_DEC # + self.tp_was_filled = () # - Exist incomplete processing filled TP # self.sum_amount_first = O_DEC # Sum buy/sell in first currency for current cycle self.sum_amount_second = O_DEC # Sum buy/sell in second currency for current cycle - # + self.part_amount = {} # + {order_id: (Decimal(str(amount_f)), Decimal(str(amount_s)))} of partially filled + # self.deposit_first = AMOUNT_FIRST # + Calculated operational deposit self.deposit_second = AMOUNT_SECOND # + Calculated operational deposit self.sum_profit_first = O_DEC # + Sum profit from start to now() @@ -121,9 +124,12 @@ def __init__(self, call_super=True): # schedule.every(5).minutes.do(self.event_grid_update) schedule.every(5).seconds.do(self.event_processing) + schedule.every(1).minutes.do(self.event_grid_only_release) + schedule.every().minute.at(":30").do(self.event_grid_only_release) + schedule.every().minute.at(":35").do(self.event_update_tp) schedule.every(2).seconds.do(self.event_exec_command) if MODE in ('T', 'TC'): - schedule.every().minute.do(self.event_export_operational_status) + schedule.every().minute.at(":15").do(self.event_export_operational_status) schedule.every(10).seconds.do(self.event_get_command_tlg) schedule.every(6).seconds.do(self.event_report) @@ -410,10 +416,7 @@ def event_processing(self): self.wait_wss_refresh['allow_grid_shift'], self.wait_wss_refresh['additional_grid'], self.wait_wss_refresh['grid_update']) - if ADAPTIVE_TRADE_CONDITION and self.stable_state(): - if self.tp_order_id and not self.tp_part_amount_first and self.get_time() - self.tp_order[3] > 60 * 15: - self.message_log("Update TP order", color=Style.B_WHITE) - self.place_profit_order() + self.event_update_tp() if self.wait_refunding_for_start or self.tp_order_hold or self.grid_hold: self.get_buffered_funds() if self.reverse_hold: @@ -438,28 +441,57 @@ def event_processing(self): self.start() else: self.start_reverse_time = self.get_time() - if self.grid_only_restart and self.get_time() > self.grid_only_restart and START_ON_BUY and AMOUNT_FIRST: + + def event_update_tp(self): + if ADAPTIVE_TRADE_CONDITION and self.stable_state(): + if self.tp_order_id and not self.tp_part_amount_first and self.get_time() - self.tp_order[3] > 60 * 15: + self.message_log("Update TP order", color=Style.B_WHITE) + self.place_profit_order() + + def event_grid_only_release(self): + if self.grid_only_restart and START_ON_BUY and AMOUNT_FIRST: ff, fs, _, _ = self.get_free_assets(mode='available') - if ff < AMOUNT_FIRST and fs > AMOUNT_SECOND: + if self.get_time() > self.grid_only_restart and ff < AMOUNT_FIRST and fs > AMOUNT_SECOND: self.grid_only_restart = 0 + self.save_init_assets(ff, fs) self.sum_amount_first = self.sum_amount_second = O_DEC self.start() - def stable_state(self): + def _common_stable_conditions(self): + """ + Checks the common conditions for stability in both live and backtest modes. + """ return ( - self.shift_grid_threshold is None - and self.grid_remove is None - and not self.reverse_hold + self.grid_remove is None and not GRID_ONLY and not self.grid_update_started and not self.start_after_shift and not self.tp_hold and not self.tp_order_hold - and not self.tp_was_filled and not self.orders_init and self.command != 'stopped' ) + def stable_state(self): + """ + Checks if the system is in a stable state for live trading. + """ + return ( + self._common_stable_conditions() + and self.shift_grid_threshold is None + and not self.reverse_hold + ) + + def stable_state_backtest(self): + """ + Checks if the system is in a stable state for backtesting. + """ + return ( + self._common_stable_conditions() + and not self.part_amount + and not self.tp_part_amount_first + ) + def restore_strategy_state(self, strategy_state: Dict[str, str] = None, restore=True) -> None: if strategy_state: self.message_log("Restore strategy state from saved state:", log_level=logging.INFO) @@ -467,7 +499,7 @@ def restore_strategy_state(self, strategy_state: Dict[str, str] = None, restore= # self.command = json.loads(strategy_state.get('command')) self.grid_remove = json.loads(strategy_state.get('grid_remove', 'null')) - self.grid_only_restart = json.loads(strategy_state.get('grid_only_restart', 0)) + self.grid_only_restart = json.loads(strategy_state.get('grid_only_restart', "0")) # self.cycle_buy = json.loads(strategy_state.get('cycle_buy')) self.cycle_buy_count = json.loads(strategy_state.get('cycle_buy_count')) @@ -545,7 +577,7 @@ def restore_strategy_state(self, strategy_state: Dict[str, str] = None, restore= elif not self.orders_grid and not self.orders_hold and not self.orders_save and not self.tp_order_id: self.message_log("Restore, Restart", tlg=True) self.start() - if not GRID_ONLY and self.shift_grid_threshold is None and not self.tp_order_id: + if not self.tp_order_id and self.stable_state(): self.message_log("Restore, no TP order, replace", tlg=True) self.place_profit_order() @@ -2017,10 +2049,10 @@ def cancel_grid(self, cancel_all=False): self.message_log(f"cancel_grid order: {_id}", log_level=logging.DEBUG) self.cancel_order(_id, cancel_all=cancel_all) else: - self.message_log("cancel_grid: Ended", log_level=logging.DEBUG) + self.grid_remove = None self.orders_save.orders_list.clear() self.orders_hold.orders_list.clear() - self.grid_remove = None + self.message_log("cancel_grid: Ended", log_level=logging.DEBUG) if self.tp_was_filled: self.grid_update_started = None self.after_filled_tp(one_else_grid=False) diff --git a/martin_binance/lib.py b/martin_binance/lib.py index fb5c4f9..8db6370 100644 --- a/martin_binance/lib.py +++ b/martin_binance/lib.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc3" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/params.py b/martin_binance/params.py index 80db318..987878f 100644 --- a/martin_binance/params.py +++ b/martin_binance/params.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.0rc1" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -45,7 +45,7 @@ LOG_LEVEL = logging.DEBUG # Default level for console output HOLD_TP_ORDER_TIMEOUT = 30 COLLECT_ASSETS = bool() -GRID_ONLY_DELAY = 30 # sec delay before try restart GRID_ONLY cycle +GRID_ONLY_DELAY = 150 # sec delay before try restart GRID_ONLY cycle # ADAPTIVE_TRADE_CONDITION = bool() BB_CANDLE_SIZE_IN_MINUTES = int() diff --git a/martin_binance/strategy_base.py b/martin_binance/strategy_base.py index 018d6b4..6316459 100644 --- a/martin_binance/strategy_base.py +++ b/martin_binance/strategy_base.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc7" +__version__ = "3.0.1" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -135,9 +135,6 @@ def __init__(self): # self.cycle_time = None # + Cycle start time self.command = None # + External input command from Telegram - self.part_amount = {} # + {order_id: (Decimal(str(amount_f)), Decimal(str(amount_s)))} of partially filled - self.tp_part_amount_first = O_DEC # + Sum partially filled TP - self.tp_part_amount_second = O_DEC # + Sum partially filled TP self.connection_analytic = None # - Connection to .db def __call__(self): @@ -315,8 +312,8 @@ async def backtest_control(self): delay = HEARTBEAT * 30 # 1 min ts = time.time() restart = False + _prm_best = {} prm_best = {} - while self.operational_status: if self.start_collect and time.time() - ts > prm.SAVE_PERIOD: self.start_collect = False @@ -332,7 +329,7 @@ async def backtest_control(self): Path(self.session_root, Path(prm.PARAMS).name), str(prm.N_TRIALS), f"sqlite:///{storage_name}", - json.dumps(prm_best), + json.dumps(prm_best or _prm_best), f"{prm.ID_EXCHANGE}_{prm.SYMBOL}_S.log", ) prm_best = orjson.loads(_res) @@ -344,6 +341,7 @@ async def backtest_control(self): else: storage_name.replace(storage_name.with_name('study.db')) if prm_best: + _prm_best = dict(prm_best) self.message_log( f"Updating parameters from backtest," f" predicted value {prm_best.pop('_value')} -> {prm_best.pop('new_value')}", @@ -368,7 +366,7 @@ async def backtest_control(self): else: break - if restart and not self.part_amount and not self.tp_part_amount_first: + if restart and self.stable_state_backtest(): restart = False self.parquet_declare(Path(self.session_root, "raw")) # Refresh klines init @@ -649,10 +647,10 @@ async def ask_exit(self): if prm.MODE in ('T', 'TC'): try: await self.send_request(self.stub.stop_stream, mr.MarketRequest, symbol=self.symbol) - self.session.channel.close() except Exception as ex: self.message_log(f"ask_exit: {ex}", log_level=logging.WARNING) + self.session.channel.close() self.task_cancel() if prm.MODE == 'TC' and self.start_collect: @@ -745,6 +743,12 @@ async def loop_ds(self, ds, ticker=False): await asyncio.sleep(delay) yield orjson.loads(row['row']) + if self.s_mode_break: + break + else: + continue + break + if ticker: self.backtest['ticker_index_last'] = index_prev * 1000 @@ -1338,8 +1342,6 @@ async def buffered_orders(self): except UserWarning as ex_2: self.message_log(f"Exception buffered_orders 2: {ex_2}", log_level=logging.WARNING) restore = True - except ConnectionRefusedError: - restore = True except GRPCError as ex_3: status_code = ex_3.status self.message_log(f"Exception buffered_orders 3: {status_code.name}, {ex_3.message}", @@ -1590,7 +1592,7 @@ async def main(self, _symbol): last_state.pop('ms_start_time_ms', str(int(time.time() * 1000))) ) - # TODO Replace after update to 3.0.0 + # TODO Replace after 3.0.2 # self.orders = jsonpickle.decode(last_state.pop(MS_ORDERS, '{}'), keys=True) _orders = last_state.pop(MS_ORDERS, '{}') @@ -1729,6 +1731,10 @@ def reset_vars_ex(self): def refresh_scheduler(self): raise NotImplementedError + @abstractmethod + def stable_state_backtest(self): + raise NotImplementedError + # endregion diff --git a/pyproject.toml b/pyproject.toml index a65097e..33c7c2e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic = ["version", "description"] requires-python = ">=3.9" dependencies = [ - "exchanges-wrapper==2.1.5", + "exchanges-wrapper==2.1.6", "jsonpickle==3.0.2", "psutil==5.9.6", "requests==2.31.0", diff --git a/requirements.txt b/requirements.txt index e007ca7..7a08708 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -exchanges-wrapper==2.1.5 +exchanges-wrapper==2.1.6 jsonpickle==3.0.2 psutil==5.9.6 requests==2.31.0