Skip to content

Commit

Permalink
1.2.4-17 refactoring wss for market events
Browse files Browse the repository at this point in the history
  • Loading branch information
DogsTailFarmer committed Sep 15, 2022
1 parent fafdf1b commit 04ae21f
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 22 deletions.
2 changes: 1 addition & 1 deletion exchanges_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__contact__ = "https://github.com/DogsTailFarmer"
__email__ = "[email protected]"
__credits__ = ["https://github.com/DanyaSWorlD"]
__version__ = "1.2.4-16"
__version__ = "1.2.4-17"

from pathlib import Path
import shutil
Expand Down
1 change: 0 additions & 1 deletion exchanges_wrapper/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ async def stop_user_events_listener(self):

async def start_market_events_listener(self, _trade_id):
_events = self.events.registered_streams
logger.info(f"start_market_events_listener._events {_events}")
start_list = []

for _exchange in _events.keys():
Expand Down
46 changes: 27 additions & 19 deletions exchanges_wrapper/exch_srv.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,17 +446,22 @@ async def OnKlinesUpdate(self, request: api_pb2.FetchKlinesRequest,
for i in _intervals:
_event_type = f"{_symbol}@kline_{i}"
event_types.append(_event_type)
client.events.register_event(functools.partial(on_klines_update, _queue), _event_type, exchange)
client.events.register_event(functools.partial(
on_klines_update, _queue, client, request.trade_id, _event_type),
_event_type, exchange)
while True:
_event = await _queue.get()
if isinstance(_event, str) and _event == request.symbol:
[open_client.client.events.unregister(_event_type, exchange) for _event_type in event_types]
[open_client.client.events.unregister(
_event_type, exchange, request.trade_id)
for _event_type in event_types
]
client.stream_queue.remove(_queue)
logger.info(f"OnKlinesUpdate: Stop market stream for {open_client.name}:{request.symbol}:"
f"{_intervals}")
return
elif isinstance(_event, events.KlineWrapper):
# logger.info(f"OnKlinesUpdate.event: {exchange}:{_event.symbol}:{_event.kline_interval}")
logger.info(f"OnKlinesUpdate.event: {exchange}:{_event.symbol}:{_event.kline_interval}")
response.symbol = _event.symbol
response.interval = _event.kline_interval
candle = [_event.kline_start_time,
Expand Down Expand Up @@ -539,11 +544,13 @@ async def OnOrderBookUpdate(self, request: api_pb2.MarketRequest,
else:
_symbol = request.symbol.lower()
_event_type = f"{_symbol}@depth5"
client.events.register_event(functools.partial(on_order_book_update, _queue), _event_type, client.exchange)
client.events.register_event(functools.partial(
on_order_book_update, _queue, client, request.trade_id, _event_type),
_event_type, client.exchange)
while True:
_event = await _queue.get()
if isinstance(_event, str) and _event == request.symbol:
open_client.client.events.unregister(_event_type, client.exchange)
client.events.unregister(_event_type, client.exchange, request.trade_id)
client.stream_queue.remove(_queue)
logger.info(f"OnOrderBookUpdate: Stop market stream for {open_client.name}: {request.symbol}")
return
Expand Down Expand Up @@ -721,7 +728,7 @@ async def StartStream(self, request: api_pb2.StartStreamRequest,
_market_stream = open_client.client.events.registered_streams
_market_stream_count = sum([len(_market_stream.get(k)) for k in _market_stream.keys()])

if client.exchange == 'binance':
if client.exchange in ('binance', 'ftx'):
for trade in client.market_data_streams.keys():
await client.stop_market_events_listener(_trade_id=trade)

Expand All @@ -745,10 +752,9 @@ async def StopStream(self, request: api_pb2.MarketRequest,
return response


async def on_klines_update(_queue, event: events.KlineWrapper):
async def on_klines_update(_queue, client, trade_id, _event_type, event: events.KlineWrapper):
# logger.info(f"on_klines_update.event: {event}")
_event = weakref.ref(event)
await _queue.put(_event())
await event_handler(_queue, client, trade_id, _event_type, event)


async def on_order_update(_queue, event: events.OrderUpdateWrapper):
Expand All @@ -763,25 +769,27 @@ async def on_funds_update(_queue, event: events.OutboundAccountPositionWrapper):
await _queue.put(_event())


async def on_ticker_update(_queue, client, trade_id, _event_type, event: events.SymbolMiniTickerWrapper = None):
logger.info(f"on_ticker_update.event: {event.event_type}, {event.symbol}, {event.event_time}, {event.close_price}")
async def on_ticker_update(_queue, client, trade_id, _event_type, event: events.SymbolMiniTickerWrapper):
# logger.info(f"on_ticker_update.event: {event.event_type},{event.symbol},{event.event_time},{event.close_price}")
await event_handler(_queue, client, trade_id, _event_type, event)


async def on_order_book_update(_queue, client, trade_id, _event_type, event: events.PartialBookDepthWrapper):
# logger.info(f"on_order_book_update.event: {event.last_update_id}")
await event_handler(_queue, client, trade_id, _event_type, event)


async def event_handler(_queue, client, trade_id, _event_type, event):
_event = weakref.ref(event)
try:
_queue.put_nowait(_event())
# await _queue.put(_event())
except asyncio.QueueFull:
logger.info("asyncio.QueueFull")
logger.warning(f"For {_event_type} asyncio.QueueFull and wold be closed")
client.events.unregister(_event_type, client.exchange, trade_id)
await client.stop_market_events_listener(trade_id)
client.stream_queue.remove(_queue)


async def on_order_book_update(_queue, event: events.PartialBookDepthWrapper):
# logger.info(f"on_order_book_update.event: {event.last_update_id}")
_event = weakref.ref(event)
await _queue.put(_event())


def is_port_in_use(port: int) -> bool:
import socket
# with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s:
Expand Down
1 change: 0 additions & 1 deletion exchanges_wrapper/web_sockets.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ async def start_wss(self):
self.web_socket = await self.session.ws_connect(f"{self.endpoint}/stream?streams={combined_streams}",
proxy=self.client.proxy)
logger.info(f"Combined events stream started: {combined_streams}")
logger.info(f"start_wss.handlers: {self.client.events.handlers}")
await self._handle_messages(self.web_socket)
else:
symbol = self.channel.split('@')[0]
Expand Down

0 comments on commit 04ae21f

Please sign in to comment.