diff --git a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs index b0a0ce111..bf9f6beda 100644 --- a/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs +++ b/crates/interledger-settlement-engines/src/engines/ethereum_ledger/eth_engine.rs @@ -1,5 +1,5 @@ use super::types::{Addresses, EthereumAccount, EthereumLedgerTxSigner, EthereumStore}; -use super::utils::{make_tx, sent_to_us}; +use super::utils::{make_tx, sent_to_us, transfer_logs, ERC20Transfer}; use clarity::Signature; use log::{debug, error, trace}; use sha3::{Digest, Keccak256 as Sha3}; @@ -256,7 +256,9 @@ where let web3 = self.web3.clone(); let store = self.store.clone(); let store_clone = self.store.clone(); - let self_clone = self.clone(); + let self_clone2 = self.clone(); + let our_address = self.address.own_address; + let token_address = self.address.token_address; // get the current block number web3.eth() @@ -275,14 +277,43 @@ where "Will fetch txs from block {} until {}", last_observed_block, fetch_until ); - let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64(); - // for each block create a future which will notify the - // connector about all the transactions in that block that are sent to our account - let submit_all_txs_fut = - checked_blocks.map(move |block_num| self_clone.submit_txs_in_block(block_num)); - - // combine all the futures for that range of blocks - (Ok(fetch_until), join_all(submit_all_txs_fut)) + + // if the engine uses ERC20 tokens then use eth_getLogs filters + let submit_txs_future = if let Some(token_address) = token_address { + debug!("Settling for ERC20 transactions"); + // get all erc20 transactions + let submit_all_erc20_txs_fut = transfer_logs( + web3.clone(), + token_address, + None, + Some(our_address), + BlockNumber::Number(last_observed_block.low_u64()), + BlockNumber::Number(fetch_until.low_u64()), + ) + .and_then(move |transfers: Vec| { + // map each incoming erc20 transactions to an outgoing + // notification to the connector + Ok(transfers.into_iter().map(move |transfer| { + self_clone2.submit_erc20_transfer(transfer, token_address) + })) + }); + + // combine all erc20 futures for that range of blocks + Either::A(submit_all_erc20_txs_fut) + } else { + debug!("Settling for ETH transactions"); + // otherwise we have to filter through each block's transactions manually + let checked_blocks = last_observed_block.low_u64()..=fetch_until.low_u64(); + // for each block create a future which will notify the + // connector about all the transactions in that block that are sent to our account + let submit_all_txs_fut = checked_blocks + .map(move |block_num| self_clone2.submit_txs_in_block(block_num)); + + // combine all the eth futures for that range of blocks + Either::B(submit_all_txs_fut) + }; + + Ok((fetch_until, submit_txs_future)) }) .and_then(move |(fetch_until, _res)| { // now that all transactions have been processed successfully, we @@ -291,7 +322,46 @@ where }) } + /// Submits an ERC20 transfer object's data to the connector + // todo: Try combining the body of this function with `submit_tx_to_connector` + fn submit_erc20_transfer( + &self, + transfer: ERC20Transfer, + token_address: Address, + ) -> impl Future { + let store = self.store.clone(); + let tx_hash = transfer.tx_hash; + let self_clone = self.clone(); + let addr = Addresses { + own_address: transfer.from, + token_address: Some(token_address), + }; + let amount = transfer.amount; + store + .check_tx_credited(transfer.tx_hash) + .map_err(move |_| error!("Error when querying store about transaction: {:?}", tx_hash)) + .and_then(move |credited| { + if !credited { + Either::A( + store + .load_account_id_from_address(addr) + .and_then(move |id| { + self_clone.notify_connector(id.to_string(), amount.low_u64()) + }) + .and_then(move |_| { + // only save the transaction hash if the connector + // was successfully notified + store.credit_tx(tx_hash) + }), + ) + } else { + Either::B(ok(())) // return an empty future otherwise since we want to skip this transaction + } + }) + } + fn submit_txs_in_block(&self, block_number: u64) -> impl Future { + debug!("Getting txs for block {}", block_number); let self_clone = self.clone(); // Get the block at `block_number` self.web3 @@ -299,6 +369,7 @@ where .block(BlockNumber::Number(block_number as u64).into()) .map_err(move |err| error!("Got error while getting block {}: {:?}", block_number, err)) .and_then(move |maybe_block| { + debug!("Got maybe block {:?}", maybe_block); // Error out if the block was not found (unlikely to occur since we're only // calling this for past blocks) if let Some(block) = maybe_block { @@ -328,6 +399,7 @@ where } fn submit_tx_to_connector(&self, tx_hash: H256) -> impl Future { + debug!("in submit_tx_to_connector for tx: {:?}", tx_hash); let our_address = self.address.own_address; let web3 = self.web3.clone(); let store = self.store.clone();