Skip to content

Commit

Permalink
1.2.4-18 fixed #2, fixed #3
Browse files Browse the repository at this point in the history
  • Loading branch information
DogsTailFarmer committed Sep 18, 2022
1 parent 04ae21f commit d1ecac2
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 182 deletions.
2 changes: 1 addition & 1 deletion exchanges_wrapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
__contact__ = "https://github.com/DogsTailFarmer"
__email__ = "[email protected]"
__credits__ = ["https://github.com/DanyaSWorlD"]
__version__ = "1.2.4-17"
__version__ = "1.2.4-18"

from pathlib import Path
import shutil
Expand Down
91 changes: 42 additions & 49 deletions exchanges_wrapper/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ def __init__(
self.symbols = {}
self.highest_precision = None
self.rate_limits = None
self.market_data_streams = defaultdict(set)
self.user_data_stream = None
self.data_streams = defaultdict(set)
self.active_orders = {}
self.wss_buffer = {}
self.stream_queue = []
self.stream_queue = defaultdict(set)

async def load(self):
infos = await self.fetch_exchange_info()
Expand Down Expand Up @@ -115,62 +114,56 @@ def events(self):
self._events = Events() # skipcq: PYL-W0201
return self._events

async def start_user_events_listener(self):
logger.info(f"Start '{self.exchange}' user events listener")
self.user_data_stream = None
async def start_user_events_listener(self, _trade_id):
logger.info(f"Start '{self.exchange}' user events listener for {_trade_id}")
user_data_stream = None
if self.exchange == 'binance':
self.user_data_stream = UserEventsDataStream(self, self.endpoint_ws_auth, self.user_agent, self.exchange)
user_data_stream = UserEventsDataStream(self, self.endpoint_ws_auth, self.user_agent, self.exchange)
elif self.exchange == 'ftx':
self.user_data_stream = FtxPrivateEventsDataStream(self,
self.endpoint_ws_auth,
self.user_agent,
self.exchange,
self.sub_account)
user_data_stream = FtxPrivateEventsDataStream(self,
self.endpoint_ws_auth,
self.user_agent,
self.exchange,
self.sub_account)
elif self.exchange == 'bitfinex':
self.user_data_stream = BfxPrivateEventsDataStream(self,
self.endpoint_ws_auth,
self.user_agent,
self.exchange)
if self.user_data_stream:
await self.user_data_stream.start()

async def stop_user_events_listener(self):
if self.user_data_stream:
await self.user_data_stream.stop()
user_data_stream = BfxPrivateEventsDataStream(self, self.endpoint_ws_auth, self.user_agent, self.exchange)
if user_data_stream:
self.data_streams[_trade_id] |= {user_data_stream}
await user_data_stream.start()

async def start_market_events_listener(self, _trade_id):
_events = self.events.registered_streams
_events = self.events.registered_streams.get(self.exchange, dict()).get(_trade_id, set())
start_list = []

for _exchange in _events.keys():
logger.info(f"Start '{_exchange}' market events listener: ({', '.join(_events.get(_exchange))})"
f" for {_trade_id}")
if _exchange == 'binance':
_endpoint = BINANCE_ENDPOINT_WS
market_data_stream = MarketEventsDataStream(self, _endpoint, self.user_agent, _exchange, _trade_id)
self.market_data_streams[_trade_id] |= {market_data_stream}
logger.info(f"Start '{self.exchange}' market events listener: ({', '.join(_events)}) for {_trade_id}")

if self.exchange == 'binance':
_endpoint = BINANCE_ENDPOINT_WS
market_data_stream = MarketEventsDataStream(self, _endpoint, self.user_agent, self.exchange, _trade_id)
self.data_streams[_trade_id] |= {market_data_stream}
start_list.append(market_data_stream.start())
else:
_endpoint = self.endpoint_ws_public
for channel in _events:
market_data_stream = MarketEventsDataStream(self,
_endpoint,
self.user_agent,
self.exchange,
_trade_id,
channel)
self.data_streams[_trade_id] |= {market_data_stream}
start_list.append(market_data_stream.start())
else:
_endpoint = self.endpoint_ws_public
for channel in self.events.registered_streams.get(_exchange):
market_data_stream = MarketEventsDataStream(self,
_endpoint,
self.user_agent,
_exchange,
_trade_id,
channel)
self.market_data_streams[_trade_id] |= {market_data_stream}
start_list.append(market_data_stream.start())
logger.info(f"start_market_events_listener.market_data_streams: {self.market_data_streams}")
await asyncio.gather(*start_list, return_exceptions=True)

async def stop_market_events_listener(self, _trade_id):
logger.info(f"stop_market_events_listener.market_data_streams: {self.market_data_streams} for {_trade_id}")
stop_list = []
for market_data_stream in self.market_data_streams.get(_trade_id, []):
stop = market_data_stream.stop()
stop_list.append(stop)
await asyncio.gather(*stop_list, return_exceptions=False)
async def stop_events_listener(self, _trade_id):
logger.info(f"stop_events_listener.data_streams: {self.data_streams} for {_trade_id}")
stopped_data_stream = self.data_streams.get(_trade_id, set()).copy()
for data_stream in stopped_data_stream:
await data_stream.stop()
self.data_streams.get(_trade_id, set()).discard(data_stream)
if not self.data_streams.get(_trade_id, 1):
self.data_streams.pop(_trade_id, None)
logger.info(f"2 stop_events_listener.data_streams: {self.data_streams} for {_trade_id}")

def assert_symbol_exists(self, symbol):
if self.loaded and symbol not in self.symbols:
Expand Down
52 changes: 18 additions & 34 deletions exchanges_wrapper/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,34 @@ def __repr__(self):
class Events:
def __init__(self):
self.handlers = defaultdict(Handlers)
self.registered_streams = defaultdict(set)
self.registered_streams = defaultdict(lambda: defaultdict(set))

def register_user_event(self, listener, event_type):
self.handlers[event_type].append(listener)
# print(f"register_user_event.handlers: {self.handlers}")

def unregister_user_event(self, event_type):
self.handlers.pop(event_type)
# print(f"unregister_user_event.handlers: {self.handlers}")

def register_event(self, listener, event_type, exchange):
def register_event(self, listener, event_type, exchange, trade_id):
logger.info(f"register: event_type: {event_type}, exchange: {exchange}")
self.registered_streams[exchange] |= {event_type}
self.registered_streams[exchange][trade_id] |= {event_type}
if exchange == 'ftx':
event_type = f"{event_type.split('@')[0].replace('/', '').lower()}@{event_type.split('@')[1]}"
elif exchange == 'bitfinex':
event_type = f"{event_type.split('@')[0][1:].replace(':', '').lower()}@{event_type.split('@')[1]}"
self.handlers[event_type].append(listener)
'''
a = self.handlers.get(event_type)
print(dir(a))
print(type(a[0]))
print(dir(a[0]))
print(a[0])
print(a[0].args)
print(a[0].args[2])
'''

def unregister(self, event_type, exchange, trade_id):
logger.info(f"unregister: event_type: {event_type}, exchange: {exchange}")

logger.info(f"unregister: registered_streams: {self.registered_streams}")
logger.info(f"unregister: handlers: {self.handlers}")

_event_type = f"{event_type.split('@')[0].replace('/', '').lower()}@{event_type.split('@')[1]}"
_handlers = self.handlers.pop(_event_type, [])
_handlers[:] = [i for i in _handlers if i.args[2] != trade_id]
if _handlers:
self.handlers.update({_event_type: _handlers})
else:
self.registered_streams[exchange].discard(event_type)

logger.info(f"unregister: registered_streams: {self.registered_streams}")
logger.info(f"unregister: handlers: {self.handlers}")

def unregister(self, exchange, trade_id):
logger.info(f"Unregister events for {trade_id}")
_event_types = self.handlers.keys()
unregistered_event_types = []
for _event_type in _event_types:
_handlers = self.handlers.get(_event_type, [])
_handlers[:] = [i for i in _handlers if i.args[2] != trade_id]
if _handlers:
self.handlers.update({_event_type: _handlers})
else:
unregistered_event_types.append(_event_type)
for _event_type in unregistered_event_types:
self.handlers.pop(_event_type, None)
self.registered_streams.get(exchange, dict()).pop(trade_id, None)

def wrap_event(self, event_data):
# print(f"wrap_event.event_data: {event_data}")
Expand Down
Loading

0 comments on commit d1ecac2

Please sign in to comment.