Skip to content

Commit

Permalink
3.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
DogsTailFarmer committed Apr 14, 2024
1 parent 71d095e commit a35a9b7
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 67 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
## 3.0.4 - 2024-04-14
### Fix
* Creating and manage asynchronous tasks

### Update
* Up requirements for exchanges-wrapper==2.1.9
* 'Backtesting': some minor improvements
* Remove cross-version updates

## 3.0.3 - 2024-04-08
### Update
* Refine templates handling
Expand Down
6 changes: 1 addition & 5 deletions martin_binance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.3"
__version__ = "3.0.4"
__maintainer__ = "Jerry Fedorenko"
__contact__ = "https://github.com/DogsTailFarmer"

Expand All @@ -29,10 +29,6 @@
TRIAL_PARAMS = Path(WORK_PATH, "trial_params.json")
EQUAL_STR = "================================================================"

# TODO remove after update to 3.0.3
if CONFIG_FILE.exists() and not TRIAL_PARAMS.exists():
copy(Path(Path(__file__).parent.absolute(), "templates/trial_params.json"), TRIAL_PARAMS)


def init():
if CONFIG_FILE.exists():
Expand Down
8 changes: 1 addition & 7 deletions martin_binance/backtest/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2024 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.3"
__version__ = "3.0.4"
__maintainer__ = "Jerry Fedorenko"
__contact__ = "https://github.com/DogsTailFarmer"

Expand Down Expand Up @@ -86,12 +86,6 @@ def objective(_trial):
return _study


async def run_optimize(*args):
process = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
stdout, _ = await process.communicate()
return stdout.splitlines()[0]


if __name__ == "__main__":
logger = logging.getLogger('logger_S')
logger.level = logging.INFO
Expand Down
12 changes: 5 additions & 7 deletions martin_binance/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.1"
__version__ = "3.0.4"
__maintainer__ = "Jerry Fedorenko"
__contact__ = "https://github.com/DogsTailFarmer"

Expand Down Expand Up @@ -71,12 +71,12 @@ async def connect(self):
raise UserWarning(f"{ex}, reconnect...") from None
except GRPCError as ex:
status_code = ex.status
logger.warning(f"Exception on register client: {status_code.name}, {ex.message}")
if status_code == Status.FAILED_PRECONDITION:
raise SystemExit(1) from ex
raise UserWarning(f"Exception on register client: {status_code.name}, {ex.message}") from None
raise UserWarning
else:
logger.info(f"gRPC session started for client_id: {_client.client_id}\n"
f"trade_id: {self.trade_id}")
logger.info(f"gRPC session started for client_id: {_client.client_id}, trade_id: {self.trade_id}")
return _client

async def send_request(self, _request, _request_type, **kwargs):
Expand All @@ -85,7 +85,7 @@ async def send_request(self, _request, _request_type, **kwargs):
kwargs['client_id'] = self.client.client_id
kwargs['trade_id'] = self.trade_id
try:
res = await _request(_request_type(**kwargs))
return await _request(_request_type(**kwargs))
except asyncio.CancelledError:
pass # Task cancellation should not be logged as an error
except grpclib.exceptions.StreamTerminatedError:
Expand All @@ -101,8 +101,6 @@ async def send_request(self, _request, _request_type, **kwargs):
raise
except Exception as ex:
logger.error(f"Exception on send request {ex}")
else:
return res

async def for_request(self, _request, _request_type, **kwargs):
if not self.client:
Expand Down
88 changes: 50 additions & 38 deletions martin_binance/strategy_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
__author__ = "Jerry Fedorenko"
__copyright__ = "Copyright © 2021 Jerry Fedorenko aka VM"
__license__ = "MIT"
__version__ = "3.0.2"
__version__ = "3.0.4"
__maintainer__ = "Jerry Fedorenko"
__contact__ = "https://github.com/DogsTailFarmer"

Expand Down Expand Up @@ -39,7 +39,7 @@
import martin_binance.params as prm
from martin_binance import LAST_STATE_PATH, BACKTEST_PATH, HEARTBEAT, KLINES_INIT, EQUAL_STR, ORDER_TIMEOUT
from martin_binance.backtest.exchange_simulator import Account as backTestAccount
from martin_binance.backtest.optimizer import run_optimize, OPTIMIZER, PARAMS_FLOAT
from martin_binance.backtest.optimizer import OPTIMIZER, PARAMS_FLOAT
from martin_binance.client import Trade
from martin_binance.lib import Candle, TradingCapabilityManager, Ticker, FundsEntry, OrderBook, Style, \
any2str, PrivateTrade, Order, convert_from_minute, OrderUpdate, load_file, load_last_state, Klines
Expand All @@ -48,7 +48,7 @@
logger = logging.getLogger('logger_S')
else:
logger = logging.getLogger('logger')
loop = asyncio.get_event_loop()

color_init()

RATE_LIMITER = HEARTBEAT * 5
Expand Down Expand Up @@ -112,7 +112,7 @@ def __init__(self):
self.session_root = None
self.state_file = None
self.operational_status = None
self.tasks_list = []
self.tasks = set()
#
self.time_operational = {'ts': 0.0, 'diff': 0.0, 'new': 0.0} # - See get_time()
self.account = None
Expand All @@ -122,6 +122,7 @@ def __init__(self):
self.tlg_header = '' # - Header for Telegram message
self.start_collect = None
self.s_mode_break = None
self.backtest_process = None
# Init in reset_backtest_vars()
self.s_ticker = None
self.s_order_book = None
Expand Down Expand Up @@ -197,7 +198,7 @@ def get_buffered_ticker(self) -> Ticker:

def get_buffered_funds(self) -> Dict[str, FundsEntry]:
if self.get_time() - self.get_buffered_funds_last_time > self.rate_limiter:
loop.create_task(self.buffered_funds(print_info=False))
self.tasks_manage(self.buffered_funds(print_info=False))
self.get_buffered_funds_last_time = self.get_time()
return {self.base_asset: FundsEntry(self.funds[self.base_asset]),
self.quote_asset: FundsEntry(self.funds[self.quote_asset])}
Expand Down Expand Up @@ -240,22 +241,22 @@ def get_time(self) -> float:

def transfer_to_master(self, symbol: str, amount: str):
if prm.MODE in ('T', 'TC'):
loop.create_task(self.transfer2master(symbol, amount))
self.tasks_manage(self.transfer2master(symbol, amount))

def place_limit_order(self, buy: bool, amount: Decimal, price: Decimal) -> int:
self.order_id += 1
self.message_log(f"Send order id:{self.order_id} for {'BUY' if buy else 'SELL'}"
f" {any2str(amount)} by {any2str(price)} = {any2str(amount * price)}",
color=Style.B_YELLOW)
loop.create_task(self.place_limit_order_timeout(self.order_id))
loop.create_task(self.create_limit_order(self.order_id, buy, any2str(amount), any2str(price)))
self.tasks_manage(self.place_limit_order_timeout(self.order_id))
self.tasks_manage(self.create_limit_order(self.order_id, buy, any2str(amount), any2str(price)))
if self.exchange == 'huobi':
time.sleep(0.02)
return self.order_id

def cancel_order(self, order_id: int, cancel_all=False) -> None:
loop.create_task(self.cancel_order_timeout(order_id))
loop.create_task(self.cancel_order_call(order_id, cancel_all))
self.tasks_manage(self.cancel_order_timeout(order_id))
self.tasks_manage(self.cancel_order_call(order_id, cancel_all))

def message_log(self, msg: str, log_level=logging.INFO, tlg=False, color=Style.WHITE) -> None:
if prm.LOGGING:
Expand Down Expand Up @@ -322,23 +323,32 @@ async def backtest_control(self):
_ts = datetime.now(timezone.utc).replace(tzinfo=None)
storage_name = Path(self.session_root, "_study.db")
try:
_res = await run_optimize(
self.backtest_process = await asyncio.create_subprocess_exec(
OPTIMIZER,
f"{self.exchange}_{self.symbol}",
Path(self.session_root, Path(prm.PARAMS).name),
str(prm.N_TRIALS),
f"sqlite:///{storage_name}",
json.dumps(prm_best or _prm_best),
f"{prm.ID_EXCHANGE}_{prm.SYMBOL}_S.log",

stdout=asyncio.subprocess.PIPE
)
prm_best = orjson.loads(_res)
stdout, _ = await self.backtest_process.communicate()
_res = stdout.splitlines()
if _res:
prm_best = orjson.loads(_res[0])
else:
self.message_log("Backtest control: result is empty", log_level=logging.WARNING)
break
except (asyncio.CancelledError, KeyboardInterrupt):
break
except Exception as err:
self.message_log(f"Backtest control: {err}", log_level=logging.ERROR)
self.message_log(f"Exception traceback: {traceback.format_exc()}", log_level=logging.DEBUG)
break
#
self.backtest_process = None
storage_name.replace(storage_name.with_name('study.db'))
if prm_best:
_prm_best = dict(prm_best)
Expand Down Expand Up @@ -469,7 +479,7 @@ def back_test_handler(self):
print(f"Original time: {original_time}, test time: {test_time}, x = {original_time / test_time:.2f}")
if prm.SAVE_DS:
self._back_test_handler_ext()
loop.stop()
asyncio.get_event_loop().stop()

def _back_test_handler_ext(self):
# Save test data
Expand Down Expand Up @@ -643,6 +653,9 @@ async def ask_exit(self):
self.message_log("Got signal for exit", color=Style.MAGENTA)
self.operational_status = False
self.s_mode_break = True
if self.backtest_process:
self.backtest_process.terminate()
self.message_log("Backtest process was terminated", color=Style.GREEN)
await asyncio.sleep(HEARTBEAT)
if prm.MODE in ('T', 'TC'):
try:
Expand Down Expand Up @@ -821,7 +834,7 @@ async def cancel_order_call(self, _id: int, cancel_all=False, count=0):
self.message_log(f"Cancel order {_id}: Warning, not result getting")
_fetch_order = True
finally:
if _fetch_order:
if prm.MODE in ('T', 'TC') and _fetch_order:
res = await self.fetch_order(_id, _filled_update_call=True)
if res.get('status') in ('CANCELED', 'EXPIRED_IN_MATCH'):
await self.cancel_order_handler(_id, cancel_all)
Expand Down Expand Up @@ -851,6 +864,7 @@ async def cancel_order_timeout(self, _id):
if _id in self.canceled_order_id:
self.canceled_order_id.remove(_id)
self.on_cancel_order_error_string(_id, 'Cancel order timeout')
# await asyncio.sleep(0)

async def transfer2master(self, symbol: str, amount: str):
try:
Expand Down Expand Up @@ -966,7 +980,7 @@ async def buffered_candle(self):
klines[i.value] = kline_i

if len(klines) == len(KLINES_INIT):
self.tasks_list.append(asyncio.ensure_future(self.on_klines_update(klines)))
self.tasks_manage(self.on_klines_update(klines))
else:
self.message_log("Init buffered candle failed. try one else...", log_level=logging.WARNING)
await asyncio.sleep(random.uniform(1, 5))
Expand Down Expand Up @@ -998,7 +1012,7 @@ async def on_klines_update(self, _klines: {str: Klines}):
self.wss_fire_up = True
else:
for i in _intervals:
loop.create_task(self.aiter_candles(_klines, i))
self.tasks_manage(self.aiter_candles(_klines, i))

async def create_limit_order(self, _id: int, buy: bool, amount: str, price: str) -> None:
self.wait_order_id.append(_id)
Expand Down Expand Up @@ -1046,7 +1060,7 @@ async def create_limit_order(self, _id: int, buy: bool, amount: str, price: str)
else:
_fetch_order = True
finally:
if _fetch_order:
if prm.MODE in ('T', 'TC') and _fetch_order:
await asyncio.sleep(HEARTBEAT)
res = await self.fetch_order(0, str(_id), _filled_update_call=True)
if res.get('status') in ('NEW', 'PARTIALLY_FILLED', 'FILLED'):
Expand Down Expand Up @@ -1376,18 +1390,23 @@ async def wait_wss_init(self):
if res.success:
self.operational_status = True

def tasks_manage(self, coro):
_t = asyncio.create_task(coro)
self.tasks.add(_t)
_t.add_done_callback(self.tasks.discard)

async def wss_declare(self):
# Market stream
self.tasks_list.append(asyncio.ensure_future(self.on_ticker_update()))
self.tasks_manage(self.on_ticker_update())
await self.buffered_candle()
self.tasks_list.append(asyncio.ensure_future(self.on_order_book_update()))
self.tasks_manage(self.on_order_book_update())
if prm.MODE in ('T', 'TC'):
# User Stream
self.tasks_list.append(asyncio.ensure_future(self.on_funds_update()))
self.tasks_list.append(asyncio.ensure_future(self.on_order_update()))
self.tasks_list.append(asyncio.ensure_future(self.on_balance_update()))
self.tasks_manage(self.on_funds_update())
self.tasks_manage(self.on_order_update())
self.tasks_manage(self.on_balance_update())
if prm.MODE == 'TC':
self.tasks_list.append(asyncio.ensure_future(self.backtest_control()))
self.tasks_manage(self.backtest_control())

async def wss_init(self, update_max_queue_size=False):
if self.client_id:
Expand Down Expand Up @@ -1416,8 +1435,8 @@ async def wss_init(self, update_max_queue_size=False):
self.wss_fire_up = True

def task_cancel(self):
[task.cancel() for task in self.tasks_list if not task.done()]
self.tasks_list.clear()
[task.cancel() for task in self.tasks if not task.done()]
self.tasks.clear()

async def main(self, _symbol):
restore_state = None
Expand Down Expand Up @@ -1486,7 +1505,7 @@ async def main(self, _symbol):
else:
[exch_orders_ids.append(int(_o['orderId'])) for _o in active_orders]
# Init section
loop.create_task(self.get_exchange_info(send_request, _symbol))
self.tasks_manage(self.get_exchange_info(send_request, _symbol))
while not self.info_symbol:
await asyncio.sleep(0.1)
if prm.LOGGING:
Expand Down Expand Up @@ -1592,14 +1611,7 @@ async def main(self, _symbol):
last_state.pop('ms_start_time_ms', str(int(time.time() * 1000)))
)

# TODO Replace after 3.0.2
# self.orders = jsonpickle.decode(last_state.pop(MS_ORDERS, '{}'), keys=True)

_orders = last_state.pop(MS_ORDERS, '{}')
_orders = _orders.replace('margin_wrapper', 'lib')
self.orders = jsonpickle.decode(_orders, keys=True)
#

self.orders = jsonpickle.decode(last_state.pop(MS_ORDERS, '{}'), keys=True)
orders_keys = self.orders.keys()
for _id in exch_orders_ids:
if _id not in orders_keys:
Expand Down Expand Up @@ -1641,14 +1653,14 @@ async def main(self, _symbol):
if prm.MODE in ('T', 'TC'):
await self.wss_init()
await self.wait_wss_init()
loop.create_task(save_to_csv())
loop.create_task(self.buffered_orders())
self.tasks_manage(save_to_csv())
self.tasks_manage(self.buffered_orders())
if self.session.client.real_market:
loop.create_task(self.save_asset())
self.tasks_manage(self.save_asset())
if not restore_state:
self.start()

loop.create_task(self.heartbeat(self.session))
self.tasks_manage(self.heartbeat(self.session))

except (KeyboardInterrupt, SystemExit):
# noinspection PyProtectedMember, PyUnresolvedReferences
Expand Down
4 changes: 2 additions & 2 deletions martin_binance/templates/cli_0_BTCUSDT.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ def trade(strategy=None):
loop.create_task(strategy.main(ex.SYMBOL))
loop.run_forever()
except KeyboardInterrupt:
pass
pass # user interrupt
finally:
try:
loop.run_until_complete(strategy.ask_exit())
except (asyncio.CancelledError, KeyboardInterrupt):
pass
pass # user interrupt
except Exception as _err:
print(f"Error: {_err}")
loop.run_until_complete(loop.shutdown_asyncgens())
Expand Down
4 changes: 2 additions & 2 deletions martin_binance/templates/cli_1_BTCUSDT.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ def trade(strategy=None):
loop.create_task(strategy.main(ex.SYMBOL))
loop.run_forever()
except KeyboardInterrupt:
pass
pass # user interrupt
finally:
try:
loop.run_until_complete(strategy.ask_exit())
except (asyncio.CancelledError, KeyboardInterrupt):
pass
pass # user interrupt
except Exception as _err:
print(f"Error: {_err}")
loop.run_until_complete(loop.shutdown_asyncgens())
Expand Down
Loading

0 comments on commit a35a9b7

Please sign in to comment.