From 9749cc0eb2321a26a1e70fb0f3864665ac8af4d0 Mon Sep 17 00:00:00 2001 From: DogsTailFarmer Date: Tue, 26 Mar 2024 16:09:42 +0300 Subject: [PATCH] 3.0.1rc7 --- CHANGELOG.md | 3 +- martin_binance/__init__.py | 2 +- martin_binance/client.py | 30 ++-- martin_binance/executor.py | 305 +++++++++++++++++--------------- martin_binance/params.py | 1 + martin_binance/strategy_base.py | 14 +- pyproject.toml | 2 +- requirements.txt | 2 +- 8 files changed, 187 insertions(+), 172 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1739118..8917a35 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ -## 3.0.1rc6 - 2024-03-23 +## 3.0.1rc7 - 2024-03-26 ### Update * Refactoring processing periodically events +* Up requirements for exchanges-wrapper==2.1.5 ## 3.0.1rc3 - 2024-03-21 ### Added for new features diff --git a/martin_binance/__init__.py b/martin_binance/__init__.py index 8100584..a128487 100755 --- a/martin_binance/__init__.py +++ b/martin_binance/__init__.py @@ -6,7 +6,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc6" +__version__ = "3.0.1rc7" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" diff --git a/martin_binance/client.py b/martin_binance/client.py index cf65494..030d92a 100644 --- a/martin_binance/client.py +++ b/martin_binance/client.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc3" +__version__ = "3.0.1rc7" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -16,6 +16,7 @@ import grpclib.exceptions import shortuuid +from martin_binance import ORDER_TIMEOUT from exchanges_wrapper import martin as mr, Channel, Status, GRPCError logger = logging.getLogger('logger.client') @@ -27,8 +28,8 @@ class Trade: def __init__(self, account_name, rate_limiter, symbol): - self.channel = Channel('127.0.0.1', 50051) - self.stub = mr.MartinStub(self.channel) + self.channel = None + self.stub = None self.account_name = account_name self.rate_limiter = rate_limiter self.symbol = symbol @@ -42,10 +43,14 @@ async def get_client(self): self.wait_connection = True client = None while client is None: + self.channel = Channel('127.0.0.1', 50051) + self.stub = mr.MartinStub(self.channel) try: client = await self.connect() except UserWarning: client = None + self.channel.close() + await asyncio.sleep(random.randint(5, 30)) else: self.client = client self.wait_connection = False @@ -64,18 +69,12 @@ async def connect(self): except asyncio.CancelledError: pass # Task cancellation should not be logged as an error. except ConnectionRefusedError as ex: - logger.error(f"{ex}, reconnect...") - await asyncio.sleep(random.randint(5, 30)) - raise UserWarning from ex + raise UserWarning(f"{ex}, reconnect...") from None except GRPCError as ex: status_code = ex.status - logger.error(f"Exception on register client: {status_code.name}, {ex.message}") if status_code == Status.FAILED_PRECONDITION: raise SystemExit(1) from ex - logger.warning('Restart gRPC client session') - await asyncio.sleep(random.randint(5, 30)) - raise UserWarning from ex - + raise UserWarning(f"Exception on register client: {status_code.name}, {ex.message}") from None else: logger.info(f"gRPC session started for client_id: {_client.client_id}\n" f"trade_id: {self.trade_id}") @@ -87,20 +86,21 @@ async def send_request(self, _request, _request_type, **kwargs): kwargs['client_id'] = self.client.client_id kwargs['trade_id'] = self.trade_id try: - res = await _request(_request_type(**kwargs)) + res = await asyncio.wait_for(_request(_request_type(**kwargs)), ORDER_TIMEOUT) except asyncio.CancelledError: pass # Task cancellation should not be logged as an error except grpclib.exceptions.StreamTerminatedError: raise UserWarning("Have not connection to gRPC server") except ConnectionRefusedError: raise UserWarning("Connection to gRPC server broken") + except asyncio.TimeoutError: + self.channel.close() + raise UserWarning("gRCP request timeout error") except GRPCError as ex: status_code = ex.status if status_code == Status.UNAVAILABLE: self.client = None - raise UserWarning( - "Connection to gRPC server failed, wait connection..." - ) from ex + raise UserWarning("Wait connection to gRPC server") from None logger.debug(f"Send request {_request}: {status_code.name}, {ex.message}") raise except Exception as ex: diff --git a/martin_binance/executor.py b/martin_binance/executor.py index 2539c9f..230b900 100755 --- a/martin_binance/executor.py +++ b/martin_binance/executor.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc6" +__version__ = "3.0.1rc7" __maintainer__ = "Jerry Fedorenko" __contact__ = 'https://github.com/DogsTailFarmer' ################################################################## @@ -31,8 +31,7 @@ from martin_binance.db_utils import db_management, save_to_db from martin_binance.telegram_utils import telegram from martin_binance.strategy_base import StrategyBase, __version__ as msb_ver -from martin_binance.lib import Ticker, FundsEntry, OrderBook, Style, any2str, Order, OrderUpdate, Orders, \ - f2d, solve +from martin_binance.lib import Ticker, FundsEntry, OrderBook, Style, any2str, Order, OrderUpdate, Orders, f2d, solve from martin_binance.params import * O_DEC = Decimal() @@ -92,7 +91,7 @@ def __init__(self, call_super=True): self.cycle_status = () # - Operational status for current cycle, orders count self.cycle_time_reverse = None # + Reverse cycle start time self.first_run = True # - - self.grid_only_restart = None # - + self.grid_only_restart = 0 # + Time to restart GRID_ONLY mode self.grid_remove = None # + Flag when starting cancel grid orders self.grid_update_started = None # - Flag when grid update process started self.last_ticker_update = 0 # - @@ -120,11 +119,13 @@ def __init__(self, call_super=True): self.ts_grid_update = self.get_time() # - When updated grid self.wait_wss_refresh = {} # - # - schedule.every().minute.do(self.event_export_operational_status) - schedule.every(10).seconds.do(self.event_get_command_tlg) - schedule.every(6).seconds.do(self.event_report) schedule.every(5).minutes.do(self.event_grid_update) schedule.every(5).seconds.do(self.event_processing) + schedule.every(2).seconds.do(self.event_exec_command) + if MODE in ('T', 'TC'): + schedule.every().minute.do(self.event_export_operational_status) + schedule.every(10).seconds.do(self.event_get_command_tlg) + schedule.every(6).seconds.do(self.event_report) def init(self, check_funds=True) -> None: # skipcq: PYL-W0221 self.message_log('Start Init section') @@ -179,6 +180,10 @@ def init(self, check_funds=True) -> None: # skipcq: PYL-W0221 ds = ds.available if ds else O_DEC if USE_ALL_FUND: self.deposit_second = self.round_truncate(ds, base=False) + elif START_ON_BUY and AMOUNT_FIRST: + self.message_log(f"Keep {self.f_currency} level at {AMOUNT_FIRST}" + f" by {AMOUNT_SECOND} {self.s_currency} tranche", + color=Style.B_WHITE) elif self.deposit_second > ds: self.message_log('Not enough second coin for Buy cycle!', color=Style.B_RED) raise SystemExit(1) @@ -197,7 +202,6 @@ def init(self, check_funds=True) -> None: # skipcq: PYL-W0221 def save_strategy_state(self) -> Dict[str, str]: return { 'command': json.dumps(self.command), - 'grid_remove': json.dumps(self.grid_remove), 'cycle_buy': json.dumps(self.cycle_buy), 'cycle_buy_count': json.dumps(self.cycle_buy_count), 'cycle_sell_count': json.dumps(self.cycle_sell_count), @@ -205,6 +209,12 @@ def save_strategy_state(self) -> Dict[str, str]: 'cycle_time_reverse': json.dumps(self.cycle_time_reverse, default=str), 'deposit_first': json.dumps(self.deposit_first), 'deposit_second': json.dumps(self.deposit_second), + 'grid_only_restart': json.dumps(self.grid_only_restart), + 'grid_remove': json.dumps(self.grid_remove), + 'initial_first': json.dumps(self.initial_first), + 'initial_reverse_first': json.dumps(self.initial_reverse_first), + 'initial_reverse_second': json.dumps(self.initial_reverse_second), + 'initial_second': json.dumps(self.initial_second), 'martin': json.dumps(self.martin), 'order_q': json.dumps(self.order_q), 'orders': json.dumps(self.orders_grid.get()), @@ -212,12 +222,9 @@ def save_strategy_state(self) -> Dict[str, str]: 'orders_save': json.dumps(self.orders_save.get()), 'over_price': json.dumps(self.over_price), 'part_amount': json.dumps(str(self.part_amount)), - 'initial_first': json.dumps(self.initial_first), - 'initial_second': json.dumps(self.initial_second), - 'initial_reverse_first': json.dumps(self.initial_reverse_first), - 'initial_reverse_second': json.dumps(self.initial_reverse_second), 'profit_first': json.dumps(self.profit_first), 'profit_second': json.dumps(self.profit_second), + 'restore_orders': json.dumps(self.restore_orders), 'reverse': json.dumps(self.reverse), 'reverse_hold': json.dumps(self.reverse_hold), 'reverse_init_amount': json.dumps(self.reverse_init_amount), @@ -230,49 +237,46 @@ def save_strategy_state(self) -> Dict[str, str]: 'sum_profit_first': json.dumps(self.sum_profit_first), 'sum_profit_second': json.dumps(self.sum_profit_second), 'tp_amount': json.dumps(self.tp_amount), + 'tp_order': json.dumps(str(self.tp_order)), 'tp_order_id': json.dumps(self.tp_order_id), 'tp_part_amount_first': json.dumps(self.tp_part_amount_first), 'tp_part_amount_second': json.dumps(self.tp_part_amount_second), + 'tp_part_free': json.dumps(self.tp_part_free), 'tp_target': json.dumps(self.tp_target), - 'tp_order': json.dumps(str(self.tp_order)), - 'tp_wait_id': json.dumps(self.tp_wait_id), - 'restore_orders': json.dumps(self.restore_orders), - 'tp_part_free': json.dumps(self.tp_part_free) + 'tp_wait_id': json.dumps(self.tp_wait_id) } def event_export_operational_status(self): - if MODE in ('T', 'TC'): - has_grid_hold_timeout = self.grid_hold.get('timestamp') and \ - int(self.get_time() - self.grid_hold['timestamp']) > HOLD_TP_ORDER_TIMEOUT - has_tp_order_hold_timeout = self.tp_order_hold.get('timestamp') and \ - int(self.get_time() - self.tp_order_hold['timestamp']) > HOLD_TP_ORDER_TIMEOUT - - if self.stable_state() or has_grid_hold_timeout or has_tp_order_hold_timeout: - orders = self.get_buffered_open_orders() - order_buy = len([i for i in orders if i.buy is True]) - order_sell = len([i for i in orders if i.buy is False]) - order_hold = len(self.orders_hold) - cycle_status = (self.cycle_buy, order_buy, order_sell, order_hold) - if self.cycle_status != cycle_status: - self.cycle_status = cycle_status - if self.queue_to_db: - self.queue_to_db.put( - { - 'ID_EXCHANGE': ID_EXCHANGE, - 'f_currency': self.f_currency, - 's_currency': self.s_currency, - 'cycle_buy': self.cycle_buy, - 'order_buy': order_buy, - 'order_sell': order_sell, - 'order_hold': order_hold, - 'destination': 't_orders' - } - ) - else: - self.cycle_status = () + ts = self.get_time() + has_grid_hold_timeout = ts - self.grid_hold.get('timestamp', ts) > HOLD_TP_ORDER_TIMEOUT + has_tp_order_hold_timeout = ts - self.tp_order_hold.get('timestamp', ts) > HOLD_TP_ORDER_TIMEOUT + + if self.stable_state() or has_grid_hold_timeout or has_tp_order_hold_timeout: + orders = self.get_buffered_open_orders() + order_buy = len([i for i in orders if i.buy is True]) + order_sell = len([i for i in orders if i.buy is False]) + order_hold = len(self.orders_hold) + cycle_status = (self.cycle_buy, order_buy, order_sell, order_hold) + if self.cycle_status != cycle_status: + self.cycle_status = cycle_status + if self.queue_to_db: + self.queue_to_db.put( + { + 'ID_EXCHANGE': ID_EXCHANGE, + 'f_currency': self.f_currency, + 's_currency': self.s_currency, + 'cycle_buy': self.cycle_buy, + 'order_buy': order_buy, + 'order_sell': order_sell, + 'order_hold': order_hold, + 'destination': 't_orders' + } + ) + else: + self.cycle_status = () def event_get_command_tlg(self): - if MODE in ('T', 'TC') and self.connection_analytic: + if self.connection_analytic: cursor_analytic = self.connection_analytic.cursor() bot_id = self.tlg_header.split('.')[0] try: @@ -301,99 +305,99 @@ def event_get_command_tlg(self): self.connection_analytic.commit() except sqlite3.Error as err: print(f"UPDATE t_control: {err}") - if self.command == 'stopped': - if isinstance(self.start_collect, int): - if self.start_collect < 5: - self.start_collect += 1 - else: - self.start_collect = False - elif self.command == 'restart': + + def event_exec_command(self): + if self.command == 'stopped' and isinstance(self.start_collect, int): + if self.start_collect < 5: + self.start_collect += 1 + else: + self.start_collect = False + if self.command == 'restart': self.stop() os.execv(sys.executable, [sys.executable] + [sys.argv[0]] + ['1']) def event_report(self): - if MODE in ('T', 'TC'): - is_time_for_report_update = STATUS_DELAY and (self.get_time() - self.status_time) / 60 > STATUS_DELAY - if self.command == 'status' or is_time_for_report_update: - self.command = None - last_price = self.get_buffered_ticker().last_price - ticker_update = int(self.get_time()) - self.last_ticker_update - if self.cycle_time: - ct = str(datetime.now(timezone.utc).replace(tzinfo=None) - self.cycle_time).rsplit('.')[0] - else: - self.message_log("save_strategy_state: cycle_time is None!", log_level=logging.DEBUG) - ct = str(datetime.now(timezone.utc)).rsplit('.')[0] - if self.command == 'stopped': - self.message_log("Strategy stopped. Need manual action", tlg=True) - elif self.grid_hold or self.tp_order_hold: - funds = self.get_buffered_funds() - fund_f = funds.get(self.f_currency, O_DEC) - fund_f = fund_f.available if fund_f else O_DEC - fund_s = funds.get(self.s_currency, O_DEC) - fund_s = fund_s.available if fund_s else O_DEC - if self.grid_hold.get('timestamp'): - time_diff = int(self.get_time() - self.grid_hold['timestamp']) - self.message_log(f"Exist unreleased grid orders for\n" - f"{'Buy' if self.cycle_buy else 'Sell'} cycle with" - f" {self.grid_hold['depo']}" - f" {self.s_currency if self.cycle_buy else self.f_currency} depo.\n" - f"Available first: {fund_f} {self.f_currency}\n" - f"Available second: {fund_s} {self.s_currency}\n" + is_time_for_report_update = STATUS_DELAY and (self.get_time() - self.status_time) / 60 > STATUS_DELAY + if self.command == 'status' or is_time_for_report_update: + self.command = None + last_price = self.get_buffered_ticker().last_price + ticker_update = int(self.get_time()) - self.last_ticker_update + if self.cycle_time: + ct = str(datetime.now(timezone.utc).replace(tzinfo=None) - self.cycle_time).rsplit('.')[0] + else: + self.message_log("save_strategy_state: cycle_time is None!", log_level=logging.DEBUG) + ct = str(datetime.now(timezone.utc)).rsplit('.')[0] + if self.command == 'stopped': + self.message_log("Strategy stopped. Need manual action", tlg=True) + elif self.grid_hold or self.tp_order_hold: + funds = self.get_buffered_funds() + fund_f = funds.get(self.f_currency, O_DEC) + fund_f = fund_f.available if fund_f else O_DEC + fund_s = funds.get(self.s_currency, O_DEC) + fund_s = fund_s.available if fund_s else O_DEC + if self.grid_hold.get('timestamp'): + time_diff = int(self.get_time() - self.grid_hold['timestamp']) + self.message_log(f"Exist unreleased grid orders for\n" + f"{'Buy' if self.cycle_buy else 'Sell'} cycle with" + f" {self.grid_hold['depo']}" + f" {self.s_currency if self.cycle_buy else self.f_currency} depo.\n" + f"Available first: {fund_f} {self.f_currency}\n" + f"Available second: {fund_s} {self.s_currency}\n" + f"Last ticker price: {last_price}\n" + f"WSS status: {ticker_update}s\n" + f"From start {ct}\n" + f"Delay: {time_diff} sec", tlg=True) + elif self.tp_order_hold.get('timestamp'): + time_diff = int(self.get_time() - self.tp_order_hold['timestamp']) + if time_diff > HOLD_TP_ORDER_TIMEOUT: + self.message_log(f"Exist hold TP order on {self.tp_order_hold['amount']}" + f" {self.f_currency if self.cycle_buy else self.s_currency}\n" + f"Available first:{fund_f} {self.f_currency}\n" + f"Available second:{fund_s} {self.s_currency}\n" f"Last ticker price: {last_price}\n" f"WSS status: {ticker_update}s\n" f"From start {ct}\n" f"Delay: {time_diff} sec", tlg=True) - elif self.tp_order_hold.get('timestamp'): - time_diff = int(self.get_time() - self.tp_order_hold['timestamp']) - if time_diff > HOLD_TP_ORDER_TIMEOUT: - self.message_log(f"Exist hold TP order on {self.tp_order_hold['amount']}" - f" {self.f_currency if self.cycle_buy else self.s_currency}\n" - f"Available first:{fund_f} {self.f_currency}\n" - f"Available second:{fund_s} {self.s_currency}\n" - f"Last ticker price: {last_price}\n" - f"WSS status: {ticker_update}s\n" - f"From start {ct}\n" - f"Delay: {time_diff} sec", tlg=True) + else: + if self.cycle_status: + order_buy = self.cycle_status[1] + order_sell = self.cycle_status[2] + order_hold = self.cycle_status[3] else: - if self.cycle_status: - order_buy = self.cycle_status[1] - order_sell = self.cycle_status[2] - order_hold = self.cycle_status[3] - else: - orders = self.get_buffered_open_orders() - order_buy = len([i for i in orders if i.buy is True]) - order_sell = len([i for i in orders if i.buy is False]) - order_hold = len(self.orders_hold) - sum_profit = self.round_truncate(self.sum_profit_first * self.avg_rate + self.sum_profit_second, - base=False) - command = bool(self.command in ('end', 'stop')) - if GRID_ONLY: - header = (f"{'Buy' if self.cycle_buy else 'Sell'} assets Grid only mode\n" - f"{('Waiting funding for convert' + chr(10)) if self.grid_only_restart else ''}" - f"{self.get_free_assets()[3]}" - ) - else: - header = (f"Complete {self.cycle_buy_count} buy cycle and {self.cycle_sell_count} sell cycle\n" - f"For all cycles profit:\n" - f"First: {self.sum_profit_first}\n" - f"Second: {self.sum_profit_second}\n" - f"Summary: {sum_profit}\n" - f"{self.get_free_assets(mode='free')[3]}" - ) - self.message_log(f"{header}\n" - f"{'*** Shift grid mode ***' if self.shift_grid_threshold else '* ** ** ** *'}\n" - f"{'Buy' if self.cycle_buy else 'Sell'}{' Reverse' if self.reverse else ''}" - f"{' Hold reverse' if self.reverse_hold else ''} {MODE}-cycle with" - f" {order_buy} buy and {order_sell} sell active orders.\n" - f"{order_hold or 'No'} hold grid orders\n" - f"Over price: {self.over_price:.2f}%\n" - f"Last ticker price: {last_price}\n" - f"ver: {HEAD_VERSION}+{__version__}+{msb_ver}\n" - f"From start {ct}\n" - f"WSS status: {ticker_update}s\n" - f"{'- *** *** *** -' if self.command == 'stop' else ''}\n" - f"{'Waiting for end of cycle for manual action' if command else ''}", - tlg=True) + orders = self.get_buffered_open_orders() + order_buy = len([i for i in orders if i.buy is True]) + order_sell = len([i for i in orders if i.buy is False]) + order_hold = len(self.orders_hold) + sum_profit = self.round_truncate(self.sum_profit_first * self.avg_rate + self.sum_profit_second, + base=False) + command = bool(self.command in ('end', 'stop')) + if GRID_ONLY: + header = (f"{'Buy' if self.cycle_buy else 'Sell'} assets Grid only mode\n" + f"{('Waiting funding for convert' + chr(10)) if self.grid_only_restart else ''}" + f"{self.get_free_assets()[3]}" + ) + else: + header = (f"Complete {self.cycle_buy_count} buy cycle and {self.cycle_sell_count} sell cycle\n" + f"For all cycles profit:\n" + f"First: {self.sum_profit_first}\n" + f"Second: {self.sum_profit_second}\n" + f"Summary: {sum_profit}\n" + f"{self.get_free_assets(mode='free')[3]}" + ) + self.message_log(f"{header}\n" + f"{'*** Shift grid mode ***' if self.shift_grid_threshold else '* ** ** ** *'}\n" + f"{'Buy' if self.cycle_buy else 'Sell'}{' Reverse' if self.reverse else ''}" + f"{' Hold reverse' if self.reverse_hold else ''} {MODE}-cycle with" + f" {order_buy} buy and {order_sell} sell active orders.\n" + f"{order_hold or 'No'} hold grid orders\n" + f"Over price: {self.over_price:.2f}%\n" + f"Last ticker price: {last_price}\n" + f"ver: {HEAD_VERSION}+{__version__}+{msb_ver}\n" + f"From start {ct}\n" + f"WSS status: {ticker_update}s\n" + f"{'- *** *** *** -' if self.command == 'stop' else ''}\n" + f"{'Waiting for end of cycle for manual action' if command else ''}", + tlg=True) def refresh_scheduler(self): schedule.run_pending() @@ -434,6 +438,12 @@ def event_processing(self): self.start() else: self.start_reverse_time = self.get_time() + if self.grid_only_restart and self.get_time() > self.grid_only_restart and START_ON_BUY and AMOUNT_FIRST: + ff, fs, _, _ = self.get_free_assets(mode='available') + if ff < AMOUNT_FIRST and fs > AMOUNT_SECOND: + self.grid_only_restart = 0 + self.sum_amount_first = self.sum_amount_second = O_DEC + self.start() def stable_state(self): return ( @@ -444,6 +454,7 @@ def stable_state(self): and not self.grid_update_started and not self.start_after_shift and not self.tp_hold + and not self.tp_order_hold and not self.tp_was_filled and not self.orders_init and self.command != 'stopped' @@ -451,12 +462,12 @@ def stable_state(self): def restore_strategy_state(self, strategy_state: Dict[str, str] = None, restore=True) -> None: if strategy_state: - # Restore from file if lose state only self.message_log("Restore strategy state from saved state:", log_level=logging.INFO) self.message_log("\n".join(f"{k}\t{v}" for k, v in strategy_state.items()), log_level=logging.DEBUG) # self.command = json.loads(strategy_state.get('command')) self.grid_remove = json.loads(strategy_state.get('grid_remove', 'null')) + self.grid_only_restart = json.loads(strategy_state.get('grid_only_restart', 0)) # self.cycle_buy = json.loads(strategy_state.get('cycle_buy')) self.cycle_buy_count = json.loads(strategy_state.get('cycle_buy_count')) @@ -528,7 +539,7 @@ def restore_strategy_state(self, strategy_state: Dict[str, str] = None, restore= if self.grid_remove: self.message_log("Restore, continue cancel grid orders", tlg=True) self.cancel_grid() - if not grid_open_orders_len and self.orders_hold: + elif not grid_open_orders_len and self.orders_hold: self.message_log("Restore, no grid orders, place from hold now", tlg=True) self.place_grid_part() elif not self.orders_grid and not self.orders_hold and not self.orders_save and not self.tp_order_id: @@ -640,10 +651,11 @@ def start(self, profit_f: Decimal = O_DEC, profit_s: Decimal = O_DEC) -> None: self.deposit_first = ff self.message_log(f'Use all available funds: {self.deposit_first} {self.f_currency}') self.save_init_assets(ff, fs) - if not self.check_min_amount(for_tp=False) and self.command is None: + if (START_ON_BUY and AMOUNT_FIRST and (ff >= AMOUNT_FIRST or fs < AMOUNT_SECOND)) \ + or not self.check_min_amount(for_tp=False): self.first_run = False - self.grid_only_restart = True - self.message_log("Waiting funding for convert", color=Style.B_WHITE) + self.grid_only_restart = self.get_time() + self.message_log("Waiting for conditions for conversion", color=Style.B_WHITE) return if not self.first_run and not self.start_after_shift and not self.reverse and not GRID_ONLY: self.message_log(f"Complete {self.cycle_buy_count} buy cycle and {self.cycle_sell_count} sell cycle\n" @@ -699,7 +711,6 @@ def start(self, profit_f: Decimal = O_DEC, profit_s: Decimal = O_DEC) -> None: color=Style.B_WHITE) self.debug_output() if MODE in ('TC', 'S') and self.start_collect is None: - self.start_collect = True self.first_run = False self.place_grid(self.cycle_buy, amount, self.reverse_target_amount) @@ -1808,9 +1819,14 @@ def grid_only_stop(self) -> None: f"Average rate is {avg_rate}", tlg=True) self.sum_amount_first = self.sum_amount_second = O_DEC if USE_ALL_FUND: - self.grid_only_restart = True + self.grid_only_restart = self.get_time() self.message_log("Waiting funding for convert", color=Style.B_WHITE) return + if START_ON_BUY and AMOUNT_FIRST: + self.deposit_second = AMOUNT_SECOND + self.grid_only_restart = self.get_time() + GRID_ONLY_DELAY + self.message_log(f"Keep the level {self.f_currency} at {AMOUNT_FIRST}", color=Style.B_WHITE) + return self.command = 'stop' def grid_handler( @@ -2175,10 +2191,13 @@ def on_balance_update_ex(self, balance: Dict) -> None: self.deposit_second += delta self.initial_reverse_second += delta else: - if delta < 0 and abs(delta) > self.initial_second - self.deposit_second: - self.deposit_second = self.initial_second + delta - elif delta > 0: - self.deposit_second += delta + if GRID_ONLY and START_ON_BUY and AMOUNT_FIRST: + self.message_log("Deposit is not updated for First asset level control mode") + else: + if delta < 0 and abs(delta) > self.initial_second - self.deposit_second: + self.deposit_second = self.initial_second + delta + elif delta > 0: + self.deposit_second += delta self.initial_second += delta elif asset == self.f_currency and not GRID_ONLY: if self.reverse: @@ -2207,9 +2226,9 @@ def on_balance_update_ex(self, balance: Dict) -> None: self.initial_second += delta self.message_log(f"Was {'depositing' if delta > 0 else 'transferring (withdrawing)'} {delta} {asset}", color=Style.UNDERLINE, tlg=True) - if (self.grid_only_restart or (GRID_ONLY and USE_ALL_FUND)) and restart: + if restart and self.grid_only_restart and USE_ALL_FUND: self.restart = True - self.grid_only_restart = None + self.grid_only_restart = 0 self.grid_remove = None self.cancel_grid(cancel_all=True) diff --git a/martin_binance/params.py b/martin_binance/params.py index 87a9ff4..80db318 100644 --- a/martin_binance/params.py +++ b/martin_binance/params.py @@ -45,6 +45,7 @@ LOG_LEVEL = logging.DEBUG # Default level for console output HOLD_TP_ORDER_TIMEOUT = 30 COLLECT_ASSETS = bool() +GRID_ONLY_DELAY = 30 # sec delay before try restart GRID_ONLY cycle # ADAPTIVE_TRADE_CONDITION = bool() BB_CANDLE_SIZE_IN_MINUTES = int() diff --git a/martin_binance/strategy_base.py b/martin_binance/strategy_base.py index 17afe81..018d6b4 100644 --- a/martin_binance/strategy_base.py +++ b/martin_binance/strategy_base.py @@ -4,7 +4,7 @@ __author__ = "Jerry Fedorenko" __copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM" __license__ = "MIT" -__version__ = "3.0.1rc6" +__version__ = "3.0.1rc7" __maintainer__ = "Jerry Fedorenko" __contact__ = "https://github.com/DogsTailFarmer" @@ -87,7 +87,6 @@ def __init__(self): self.client = None self.exchange = str() self.symbol = str() - self.channel = None self.stub = mr.MartinStub self.client_id = int() self.info_symbol = {} @@ -175,7 +174,6 @@ def reset_vars(self): def update_vars(self, _session): self.client = _session.client self.stub = _session.stub - self.channel = _session.channel self.client_id = _session.client.client_id if _session.client else None self.exchange = _session.client.exchange if _session.client else None self.send_request = _session.send_request @@ -651,6 +649,7 @@ async def ask_exit(self): if prm.MODE in ('T', 'TC'): try: await self.send_request(self.stub.stop_stream, mr.MarketRequest, symbol=self.symbol) + self.session.channel.close() except Exception as ex: self.message_log(f"ask_exit: {ex}", log_level=logging.WARNING) @@ -661,21 +660,16 @@ async def ask_exit(self): self.start_collect = False self.session_data_handler() - if self.channel: - self.channel.close() + if prm.LAST_STATE_FILE.exists(): + print(f"Current state saved into {prm.LAST_STATE_FILE}") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] [task.cancel() for task in tasks] await asyncio.gather(*tasks, return_exceptions=True) if prm.LOGGING: print(f"Cancelling {len(tasks)} outstanding tasks") - if prm.LAST_STATE_FILE.exists(): - print(f"Current state saved into {prm.LAST_STATE_FILE}") - try: self.stop() - except Exception as _err: - print(f"ask_exit.strategy.stop: {_err}") async def fetch_order(self, _id: int, _client_order_id: str = None, _filled_update_call=False): try: diff --git a/pyproject.toml b/pyproject.toml index 1ef4284..a65097e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ dynamic = ["version", "description"] requires-python = ">=3.9" dependencies = [ - "exchanges-wrapper==2.1.3", + "exchanges-wrapper==2.1.5", "jsonpickle==3.0.2", "psutil==5.9.6", "requests==2.31.0", diff --git a/requirements.txt b/requirements.txt index 8c5bf45..e007ca7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -exchanges-wrapper==2.1.3 +exchanges-wrapper==2.1.5 jsonpickle==3.0.2 psutil==5.9.6 requests==2.31.0