diff --git a/fastlane_bot/config/constants.py b/fastlane_bot/config/constants.py index ad6641ac0..ad855d546 100644 --- a/fastlane_bot/config/constants.py +++ b/fastlane_bot/config/constants.py @@ -34,3 +34,16 @@ SECTA_V3_NAME = "secta_v3" METAVAULT_V3_NAME = "metavault_v3" ZERO_ADDRESS = "0x0000000000000000000000000000000000000000" + +BLOCK_CHUNK_SIZE_MAP = { + "ethereum": 0, + "polygon": 0, + "polygon_zkevm": 0, + "arbitrum_one": 0, + "optimism": 0, + "coinbase_base": 0, + "fantom": 5000, + "mantle": 0, + "linea": 0, + "sei": 0, +} diff --git a/fastlane_bot/events/event_gatherer.py b/fastlane_bot/events/event_gatherer.py index 1659f5807..30b7bf7f3 100644 --- a/fastlane_bot/events/event_gatherer.py +++ b/fastlane_bot/events/event_gatherer.py @@ -1,12 +1,13 @@ +import asyncio from itertools import chain -from typing import Dict +from typing import Dict, List -import asyncio import nest_asyncio from web3 import AsyncWeb3 from web3.contract import Contract +from fastlane_bot.config.constants import BLOCK_CHUNK_SIZE_MAP from .interfaces.subscription import Subscription from .exchanges.base import Exchange @@ -21,6 +22,7 @@ class EventGatherer: def __init__( self, + blockchain: str, w3: AsyncWeb3, exchanges: Dict[str, Exchange], event_contracts: Dict[str, Contract], @@ -30,6 +32,7 @@ def __init__( manager: The Manager object w3: The connected AsyncWeb3 object. """ + self._blockchain = blockchain self._w3 = w3 self._subscriptions = [] unique_topics = set() @@ -48,16 +51,49 @@ def get_all_events(self, from_block: int, to_block: int): from_block_ = 0 else: from_block_ = from_block - coroutines.append(self._get_events_for_topic(from_block_, to_block, sub)) + coroutines.append(self._get_events_for_subscription(from_block_, to_block, sub)) results = asyncio.get_event_loop().run_until_complete(asyncio.gather(*coroutines)) return list(chain.from_iterable(results)) - async def _get_events_for_topic(self, from_block: int, to_block: int, subscription: Subscription): - events = await self._w3.eth.get_logs(filter_params={ - "fromBlock": from_block, - "toBlock": to_block, - "topics": [subscription.topic] - }) - return [subscription.parse_log(event) for event in events] + async def _get_events_for_subscription(self, from_block: int, to_block: int, subscription: Subscription): + return [subscription.parse_log(log) for log in await self._get_logs_for_topics(from_block, to_block, [subscription.topic])] + + async def _get_logs_for_topics(self, from_block: int, to_block: int, topics: List[str]): + chunk_size = BLOCK_CHUNK_SIZE_MAP[self._blockchain] + if chunk_size > 0: + return await self._get_logs_iterative(from_block, to_block, topics, chunk_size) + else: + return await self._get_logs_recursive(from_block, to_block, topics) + async def _get_logs_iterative(self, from_block: int, to_block: int, topics: List[str], chunk_size: int) -> list: + block_numbers = list(range(from_block, to_block + 1, chunk_size)) + [to_block + 1] + log_lists = await asyncio.gather([ + self._w3.eth.get_logs(filter_params={ + "fromBlock": r[0], + "toBlock": r[1], + "topics": topics + }) + for r in zip(block_numbers, map(lambda n: n - 1, block_numbers[1:])) + ]) + return [log for log_list in log_lists for log in log_list] + async def _get_logs_recursive(self, from_block: int, to_block: int, topics: List[str]) -> list: + if from_block <= to_block: + try: + return await self._w3.eth.get_logs(filter_params={ + "fromBlock": from_block, + "toBlock": to_block, + "topics": topics + }) + except Exception as e: + assert "eth_getLogs" in str(e), str(e) + if from_block < to_block: + mid_block = (from_block + to_block) // 2 + log_lists = await asyncio.gather( + self._get_logs_recursive(from_block, mid_block, topics), + self._get_logs_recursive(mid_block + 1, to_block, topics) + ) + return [log for log_list in log_lists for log in log_list] + else: + raise e + raise Exception(f"Illegal log query range: {from_block} -> {to_block}") diff --git a/main.py b/main.py index fcc722efe..722539278 100644 --- a/main.py +++ b/main.py @@ -306,7 +306,12 @@ def run(mgr, args, tenderly_uri=None) -> None: mainnet_uri = mgr.cfg.w3.provider.endpoint_uri handle_static_pools_update(mgr) - event_gatherer = EventGatherer(w3=mgr.w3_async, exchanges=mgr.exchanges, event_contracts=mgr.event_contracts) + event_gatherer = EventGatherer( + blockchain=mgr.cfg.network.NETWORK, + w3=mgr.w3_async, + exchanges=mgr.exchanges, + event_contracts=mgr.event_contracts + ) pool_finder = PoolFinder( carbon_forks=mgr.cfg.network.CARBON_V1_FORKS, diff --git a/run_blockchain_terraformer.py b/run_blockchain_terraformer.py index 854119d40..df514dbee 100644 --- a/run_blockchain_terraformer.py +++ b/run_blockchain_terraformer.py @@ -680,11 +680,14 @@ def get_events_recursive(get_logs: any, start_block: int, end_block: int) -> lis return get_logs(fromBlock=start_block, toBlock=end_block) except Exception as e: assert "eth_getLogs" in str(e), str(e) - mid_block = (start_block + end_block) // 2 - event_list_1 = get_events_recursive(get_logs, start_block, mid_block) - event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) - return event_list_1 + event_list_2 - return [] + if start_block < end_block: + mid_block = (start_block + end_block) // 2 + event_list_1 = get_events_recursive(get_logs, start_block, mid_block) + event_list_2 = get_events_recursive(get_logs, mid_block + 1, end_block) + return event_list_1 + event_list_2 + else: + raise e + raise Exception(f"Illegal log query range: {start_block} -> {end_block}") def get_uni_v3_pools(