Skip to content

Commit

Permalink
add option to credit incoming erc20 transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
gakonst committed Jul 22, 2019
1 parent 7224f32 commit a81f828
Showing 1 changed file with 82 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::types::{Addresses, EthereumAccount, EthereumLedgerTxSigner, EthereumStore};
use super::utils::{make_tx, sent_to_us};
use super::utils::{make_tx, sent_to_us, transfer_logs, ERC20Transfer};
use clarity::Signature;
use log::{debug, error, trace};
use sha3::{Digest, Keccak256 as Sha3};
Expand Down Expand Up @@ -256,7 +256,9 @@ where
let web3 = self.web3.clone();
let store = self.store.clone();
let store_clone = self.store.clone();
let self_clone = self.clone();
let self_clone2 = self.clone();
let our_address = self.address.own_address;
let token_address = self.address.token_address;

// get the current block number
web3.eth()
Expand All @@ -275,14 +277,43 @@ where
"Will fetch txs from block {} until {}",
last_observed_block, fetch_until
);
let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64();
// for each block create a future which will notify the
// connector about all the transactions in that block that are sent to our account
let submit_all_txs_fut =
checked_blocks.map(move |block_num| self_clone.submit_txs_in_block(block_num));

// combine all the futures for that range of blocks
(Ok(fetch_until), join_all(submit_all_txs_fut))

// if the engine uses ERC20 tokens then use eth_getLogs filters
let submit_txs_future = if let Some(token_address) = token_address {
debug!("Settling for ERC20 transactions");
// get all erc20 transactions
let submit_all_erc20_txs_fut = transfer_logs(
web3.clone(),
token_address,
None,
Some(our_address),
BlockNumber::Number(last_observed_block.low_u64()),
BlockNumber::Number(fetch_until.low_u64()),
)
.and_then(move |transfers: Vec<ERC20Transfer>| {
// map each incoming erc20 transactions to an outgoing
// notification to the connector
Ok(transfers.into_iter().map(move |transfer| {
self_clone2.submit_erc20_transfer(transfer, token_address)
}))
});

// combine all erc20 futures for that range of blocks
Either::A(submit_all_erc20_txs_fut)
} else {
debug!("Settling for ETH transactions");
// otherwise we have to filter through each block's transactions manually
let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64();
// for each block create a future which will notify the
// connector about all the transactions in that block that are sent to our account
let submit_all_txs_fut = checked_blocks
.map(move |block_num| self_clone2.submit_txs_in_block(block_num));

// combine all the eth futures for that range of blocks
Either::B(submit_all_txs_fut)
};

Ok((fetch_until, submit_txs_future))
})
.and_then(move |(fetch_until, _res)| {
// now that all transactions have been processed successfully, we
Expand All @@ -291,14 +322,54 @@ where
})
}

/// Submits an ERC20 transfer object's data to the connector
// todo: Try combining the body of this function with `submit_tx_to_connector`
fn submit_erc20_transfer(
&self,
transfer: ERC20Transfer,
token_address: Address,
) -> impl Future<Item = (), Error = ()> {
let store = self.store.clone();
let tx_hash = transfer.tx_hash;
let self_clone = self.clone();
let addr = Addresses {
own_address: transfer.from,
token_address: Some(token_address),
};
let amount = transfer.amount;
store
.check_tx_credited(transfer.tx_hash)
.map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash))
.and_then(move |credited| {
if !credited {
Either::A(
store
.load_account_id_from_address(addr)
.and_then(move |id| {
self_clone.notify_connector(id.to_string(), amount.low_u64())
})
.and_then(move |_| {
// only save the transaction hash if the connector
// was successfully notified
store.credit_tx(tx_hash)
}),
)
} else {
Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction
}
})
}

fn submit_txs_in_block(&self, block_number: u64) -> impl Future<Item = (), Error = ()> {
debug!("Getting txs for block {}", block_number);
let self_clone = self.clone();
// Get the block at `block_number`
self.web3
.eth()
.block(BlockNumber::Number(block_number as u64).into())
.map_err(move |err| error!("Got error while getting block {}: {:?}", block_number, err))
.and_then(move |maybe_block| {
debug!("Got maybe block {:?}", maybe_block);
// Error out if the block was not found (unlikely to occur since we're only
// calling this for past blocks)
if let Some(block) = maybe_block {
Expand Down Expand Up @@ -328,6 +399,7 @@ where
}

fn submit_tx_to_connector(&self, tx_hash: H256) -> impl Future<Item = (), Error = ()> {
debug!("in submit_tx_to_connector for tx: {:?}", tx_hash);
let our_address = self.address.own_address;
let web3 = self.web3.clone();
let store = self.store.clone();
Expand Down

0 comments on commit a81f828

Please sign in to comment.