Skip to content

Commit

Permalink
feat: wallet and treasury stats (yearn#159)
Browse files Browse the repository at this point in the history
* feat: add tx exporter

* feat: add skeleton

* fix: temporarily disable victoria

* feat: add total users

* feat: count wallets by # vaults used

* feat: add total users by vault

* fix: use Decimal instead of float

* feat: use pickle instead of csv for tx cache

* chore: refactor

* chore: checkpoint tx pickle thru block 10949751

* chore: add pandas to requirements.txt

* chore: add env/ to .gitignore

* feat: parallelism

* fix: accept new transfer event field labels

* fix: handle reverts in price magic

* fix: etherscan rate limit handler

* chore: refactor dataframe

* chore: increase dop

* chore: refactor

* chore: checkpoint tx pickle thru block 13398262

* feat: add transactions exporter to docker

* feat:user balances (#2)

* feat: add transactions exporter to docker

* chore: reduce unnecessary printing

* feat: user balances

* feat: merge tx analysis with existing exporter

* chore: wallets not users

* chore: cleanup

* chore: wallets not users

* fix: tvl logger

* chore: refactor

* feat: env variable to skip wallet stats for speed

* chore: cleanup

* fix: skip transfers

* fix: typo

* feat:user balances (#2)

* feat: add transactions exporter to docker

* chore: reduce unnecessary printing

* feat: user balances

* feat:user balances (#2)

* feat: add transactions exporter to docker

* chore: reduce unnecessary printing

* feat: user balances

* feat: treasury exporter

* chore: refactor and minor fixes

* feat: add buckets

* fix: revert curve oracle to use common base token

* fix: curve underlying coins revert for non-metapools

* fix: add details to revert message when can't decode logs

* fix: historical treasury exporter start date

* feat:user balances (#2)

* feat: add transactions exporter to docker

* chore: reduce unnecessary printing

* feat: user balances

* fix: revert curve oracle to use common base token

* fix: curve underlying coins revert for non-metapools

* feat: postgres container

* chore: add pyodbc to requirements

* feat: initial postgres design

* feat: silent kwarg for magic.get_price

* feat: postgres tx caching

* chore: better logging for treasury

* feat: make logs

* chore: add sqlalchemy to requirements

* chore: cleanup reverted changes

* chore: specify containers for make logs

* feat: fetch balances from postgres

* feat: yearn.describe_wallets

* feat: wallet exporter

* chore: sort imports

* chore: formatting

* feat: specify new agg stats

* feat: skip middleware if already setup

* chore: cleanup

* chore: black

* chore: remove unused function

* chore: cleanup

* fix: remove rai from stablecoins

* chore: cleanup

* fix: setup middleware

* chore: revert changes to create_filter

* chore: revert silent kwarg on magic.get_price

* chore: refactor

* feat: prices for one to one wrapped tokens

* feat: add ymechs msig to treasury wallets

* feat: two more yearn wallets

* chore: ignore .crypto domain NFT transfers

Co-authored-by: banteg <[email protected]>
  • Loading branch information
BobTheBuidler and banteg authored Nov 25, 2021
1 parent 5fd098d commit a3b593f
Show file tree
Hide file tree
Showing 26 changed files with 1,512 additions and 50 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.git
secrets
.env
env/
cache/
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cache
node_modules/
static/*
!static/.gitkeep
env/
.env
secrets/*
!secrets/.gitkeep
Expand Down
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ FROM python:3.8-buster
RUN mkdir -p /app/yearn-exporter
WORKDIR /app/yearn-exporter

RUN apt-get update
RUN apt-get install -y build-essential odbc-postgresql python3-dev unixodbc-dev

ADD requirements.txt ./
RUN pip3 install -r requirements.txt

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,6 @@ clean-volumes: dashboards-clean-volumes

rebuild: down build up
scratch: clean-volumes build up

logs:
$(dashboards_command) logs -f -t yearn-exporter historical-exporter treasury-exporter historical-treasury-exporter transactions-exporter wallet-exporter
6 changes: 1 addition & 5 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@ NETWORK="mainnet" # default to Mainnet (Infura)
EXPLORER=${EXPLORER:-https://api.etherscan.io/api}

if [[ ! -z "$WEB3_PROVIDER" ]]; then
NETWORK="mainnet-custom"

if [[ ! $(brownie networks list | grep mainnet-custom) ]]; then
brownie networks add Ethereum $NETWORK host=$WEB3_PROVIDER chainid=1 explorer=$EXPLORER
else
brownie networks modify $NETWORK host=$WEB3_PROVIDER chainid=1 explorer=$EXPLORER
brownie networks modify mainnet host=$WEB3_PROVIDER chainid=1 explorer=$EXPLORER
fi
fi

Expand Down
12 changes: 7 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
eth-brownie==1.16.4
web3==5.23.1
eth-brownie>=1.16.4
web3>=5.23.1
click>=7.1.2
tabulate>=0.8.7
joblib>=1.0.1
Expand All @@ -12,7 +12,9 @@ tokenlists>=0.1.0b0
aiofiles>=0.6.0
semantic-version>=2.8.5
boto3==1.17.88
ypricemagic>=0.2.4
pandas>=1.2.3
rich>=10.11.0
matplotlib>=3.4.3
matplotlib>=3.4.3
ypricemagic>=0.2.19
pandas>=1.3.0
pyodbc>=4.0.32
sqlalchemy>=1.4.26
17 changes: 8 additions & 9 deletions scripts/historical_exporter.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import logging
import math
import os
from datetime import datetime, timedelta, timezone
import time
import math
from datetime import datetime, timedelta, timezone
from itertools import count

from yearn.yearn import Yearn
import psutil
import requests
from joblib import Parallel, delayed
from toolz import partition_all
from yearn.outputs import victoria
from yearn.utils import closest_block_after_timestamp
from itertools import count
from toolz import partition_all
from joblib import Parallel, delayed
import multiprocessing
import requests
import psutil
from yearn.yearn import Yearn

logger = logging.getLogger('yearn.historical_exporter')

Expand Down
135 changes: 135 additions & 0 deletions scripts/historical_treasury_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import logging
import math
import os
import time
from datetime import datetime, timedelta, timezone
from itertools import count

import psutil
import requests
from brownie import Contract, chain
from joblib import Parallel, delayed
from toolz import partition_all
from yearn.outputs import victoria
from yearn.utils import closest_block_after_timestamp

from yearn.treasury.treasury import Treasury

logger = logging.getLogger('yearn.historical_treasury_exporter')

available_memory = psutil.virtual_memory().available / 1e9 # in GB
default_pool_size = max(1, math.floor(available_memory / 8)) # allocate 8GB per worker
POOL_SIZE = int(os.environ.get("POOL_SIZE", default_pool_size))
CHUNK_SIZE = int(os.environ.get("CHUNK_SIZE", 50))


def main():
start = datetime.now(tz=timezone.utc)
# end: 2020-02-12 first treasury tx
end = datetime(2020, 7, 21, tzinfo=timezone.utc)

interval_map = [
{
'resolution': '1d',
'start': start.replace(hour=0, minute=0, second=0, microsecond=0),
'interval': timedelta(days=1),
},
{
'resolution': '1h',
'start': start.replace(minute=0, second=0, microsecond=0),
'interval': timedelta(hours=1),
},
{
'resolution': '30m',
'start': start.replace(minute=0, second=0, microsecond=0),
'interval': timedelta(minutes=30),
},
{
'resolution': '15m',
'start': start.replace(minute=0, second=0, microsecond=0),
'interval': timedelta(minutes=15),
},
{
'resolution': '5m',
'start': start.replace(minute=0, second=0, microsecond=0),
'interval': timedelta(minutes=5),
},
{
'resolution': '1m',
'start': start.replace(second=0, microsecond=0),
'interval': timedelta(minutes=1),
},
{
'resolution': '30s',
'start': start.replace(second=0, microsecond=0),
'interval': timedelta(seconds=30),
},
{
'resolution': '15s',
'start': start.replace(second=0, microsecond=0),
'interval': timedelta(seconds=15),
},
]

resolutions = [item['resolution'] for item in interval_map]
# default resolution is hourly
resolution = os.environ.get("RESOLUTION", "1h")
if resolution not in resolutions:
resolution = "1h"

for entry in interval_map:
intervals = _generate_snapshot_range(entry["start"], end, entry["interval"])

logger.info("starting new pool with %d workers", POOL_SIZE)
Parallel(n_jobs=POOL_SIZE, backend="multiprocessing", verbose=100)(
delayed(_export_chunk)(chunk)
for chunk in partition_all(CHUNK_SIZE, intervals)
)

# if we reached the final resolution we're done
if entry['resolution'] == resolution:
break


def _export_chunk(chunk):
treasury = Treasury()
for interval in chunk:
_interval_export(treasury, interval)


def _interval_export(treasury, snapshot):
start_time = time.time()
ts = snapshot.timestamp()
block = closest_block_after_timestamp(ts)
assert block is not None, "no block after timestamp found"
treasury.export(block, ts)
duration = time.time() - start_time
victoria.export_duration(duration, POOL_SIZE, "historical_treasury", ts)
logger.info("exported treasury snapshot %s", snapshot)


def _has_data(ts):
base_url = os.environ.get('VM_URL', 'http://victoria-metrics:8428')
# query for iearn metric which was always present
url = f'{base_url}/api/v1/query?query=treasury_assets&time={ts}'
headers = {
'Connection': 'close',
}
with requests.Session() as session:
response = session.get(url=url, headers=headers)
result = response.json()
return result['status'] == 'success' and len(result['data']['result']) > 0


def _generate_snapshot_range(start, end, interval):
for i in count():
snapshot = start - i * interval
if snapshot < end:
return
else:
ts = snapshot.timestamp()
if _has_data(ts):
logger.info("data already present for snapshot %s, ts %d", snapshot, ts)
continue
else:
yield snapshot
150 changes: 150 additions & 0 deletions scripts/transactions_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import time
import warnings
from decimal import Decimal
import logging

import pandas as pd
from brownie import ZERO_ADDRESS, Contract, chain, web3
from brownie.exceptions import BrownieEnvironmentWarning
from joblib import Parallel, delayed
import sqlalchemy
from tqdm import tqdm
from web3._utils.abi import filter_by_name
from web3._utils.events import construct_event_topic_set
from yearn.events import create_filter, decode_logs, get_logs_asap
from yearn.outputs.postgres.postgres import postgres
from yearn.prices import magic
from yearn.v1.registry import Registry as RegistryV1
from yearn.v2.registry import Registry as RegistryV2
from ypricemagic.interfaces.ERC20 import ERC20ABI
from yearn.treasury.treasury import Treasury

treasury = Treasury()

warnings.simplefilter("ignore", BrownieEnvironmentWarning)

registryV1 = RegistryV1()
registryV2 = RegistryV2()

logger = logging.getLogger(__name__)


def main():
for block in chain.new_blocks(height_buffer=1):
process_and_cache_user_txs(postgres.last_recorded_block('user_txs'))


def active_vaults_at(end_block) -> set:
v1 = {vault.vault for vault in registryV1.active_vaults_at(end_block)}
v2 = {vault.vault for vault in registryV2.active_vaults_at(end_block)}
return v1.union(v2)


def process_and_cache_user_txs(last_saved_block=None):
max_block_to_cache = (
chain.height - 50
) # We look 50 blocks back to avoid uncles and reorgs
start_block = last_saved_block + 1 if last_saved_block else None
end_block = (
10650000
if start_block is None
else start_block + 500
if start_block + 500 < max_block_to_cache
else max_block_to_cache
)
df = pd.DataFrame()
for vault in tqdm(active_vaults_at(end_block)):
df = df.append(get_token_transfers(vault, start_block, end_block))
df = df.rename(columns={'token': 'vault'})
df.to_sql('user_txs', postgres.sqla_engine, if_exists='append', index=False)
print(f'user txs batch {start_block}-{end_block} exported to postrges')


# Helper functions
def get_token_transfers(token, start_block, end_block) -> pd.DataFrame:
topics = construct_event_topic_set(
filter_by_name('Transfer', token.abi)[0],
web3.codec,
)
postgres.cache_token(token.address)
decimals = Contract(token.address).decimals()
events = decode_logs(
get_logs_asap(token.address, topics, from_block=start_block, to_block=end_block)
)
return pd.DataFrame(
Parallel(1, 'threading')(
delayed(_process_transfer_event)(event, token, decimals) for event in events
)
)


def _process_transfer_event(event, vault, decimals) -> dict:
sender, receiver, amount = event.values()
postgres.cache_address(sender)
postgres.cache_address(receiver)
price = _get_price(event, vault)
if (
vault.address == '0x7F83935EcFe4729c4Ea592Ab2bC1A32588409797'
and event.block_number == 12869164
):
# NOTE magic.get_price() returns erroneous price due to erroneous ppfs
price = 99999
if price > 100000:
logger.warn(f'token: {vault.address}')
logger.warn(f'price: {price}')
logger.warn(f'block: {event.block_number}')
txhash = event.transaction_hash.hex()
return {
'chainid': chain.id,
'block': event.block_number,
'timestamp': chain[event.block_number].timestamp,
'hash': txhash,
'log_index': event.log_index,
'token': vault.address,
'type': _event_type(sender, receiver, vault.address),
'from': sender,
'to': receiver,
'amount': Decimal(amount) / Decimal(10 ** decimals),
'price': price,
'value_usd': Decimal(amount) / Decimal(10 ** decimals) * Decimal(price),
'gas_used': web3.eth.getTransactionReceipt(txhash).gasUsed,
'gas_price': web3.eth.getTransaction(
txhash
).gasPrice, # * 1.0 # force pandas to insert this as decimal not bigint
}


def _get_price(event, vault):
while True:
try:
try:
return magic.get_price(vault.address, event.block_number)
except TypeError: # magic.get_price fails because all liquidity was removed for testing and `share_price` returns None
return magic.get_price(vault.token(), event.block_number)
except ConnectionError as e:
# Try again
print(f'ConnectionError: {str(e)}')
time.sleep(1)
except ValueError as e:
print(f'ValueError: {str(e)}')
if str(e) in [
"Failed to retrieve data from API: {'status': '0', 'message': 'NOTOK', 'result': 'Max rate limit reached'}",
"Failed to retrieve data from API: {'status': '0', 'message': 'NOTOK', 'result': 'Max rate limit reached, please use API Key for higher rate limit'}",
]:
# Try again
print('trying again...')
time.sleep(5)
else:
print(f'vault: {vault.address}')
raise Exception(str(e))


def _event_type(sender, receiver, vault_address) -> str:
if sender == ZERO_ADDRESS:
return 'deposit'
elif receiver == ZERO_ADDRESS:
return 'withdrawal'
elif sender == vault_address:
return 'v2 fees'
else:
return 'transfer'
Loading

0 comments on commit a3b593f

Please sign in to comment.