Skip to content

Commit

Permalink
Ethereum Settlement Engine improvements (#150)
Browse files Browse the repository at this point in the history
* fix(store): only save the Ethereum block number in the recently observed data

We cannot assume that the operator's balance will keep increasing, since they may transfer funds to another address

* refactor(eth-se): Improve notifier readability and robustness

- use join_all to avoid callback hell
- avoid using spawn's
- mark a transaction as credited only after the call to the connector was successful
- save the last observed block only after all calls in that block range has been successfully credited

* Use pending in eth_TransactionCount call to take into account transactions which are already in the mempool

Allows the removal of the nonce middleware

* improvement(eth-se): Estimate gas and gas price before sending transaction

* improve redis keys naming

* feat(eth-se): Add filter for ERC20 transactions

* fix(eth-se): Sign the address along with the challenge

Otherwise we would be signing any message that gets sent to us and that is insecure

* (api ) FixedInterval retry when creating new account

Otherwise it may take too long for the connector to ping its engine to get the address from its peer

* improvement(eth-se): Use tx hash as an idempotency key

This ensures the same transaction is never processed twice by the connector

(also minor improvements in variable/function naming

* remove apply_outgoing_settlement function

Adding it resulted in incorerct behavior the balance is already lowered in the PROCESS_FULFILL script. With it,  the balance was lowered by 2x the amount it should.

Note that the reason the balance is lowered first is because we don't want two packets fulfilled simultaneously to trigger settlements for the same amount_to_settle. By reducing the balance by the amount_to_settle before making the settlement client send the request to the settlement engine, this ensures that the settlement will only happen once. The refund_settlement is there in case that call to the SE fails and we need to revert the change to the balance.
  • Loading branch information
gakonst authored Jul 25, 2019
1 parent 76c0db9 commit 5ba5e80
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 350 deletions.
4 changes: 2 additions & 2 deletions crates/interledger-api/src/routes/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reqwest::r#async::Client;
use serde::Serialize;
use serde_json::{json, Value};
use std::str::FromStr;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
use tokio_retry::{strategy::FixedInterval, Retry};
use tower_web::{impl_web, Response};
use url::Url;

Expand Down Expand Up @@ -108,7 +108,7 @@ impl_web! {
}
})
};
Retry::spawn(ExponentialBackoff::from_millis(10).take(MAX_RETRIES), action)
Retry::spawn(FixedInterval::from_millis(2000).take(MAX_RETRIES), action)
.map_err(|_| Response::builder().status(500).body(()).unwrap())
.and_then(move |_| {
Ok(json!(account))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ pub struct TestStore {
#[allow(clippy::all)]
pub cache: Arc<RwLock<HashMap<String, (StatusCode, String, [u8; 32])>>>,
pub last_observed_block: Arc<RwLock<U256>>,
pub last_observed_balance: Arc<RwLock<U256>>,
pub saved_hashes: Arc<RwLock<HashMap<H256, bool>>>,
pub cache_hits: Arc<RwLock<u64>>,
}
Expand Down Expand Up @@ -100,25 +99,17 @@ impl EthereumStore for TestStore {
Box::new(ok(v))
}

fn save_recently_observed_data(
fn save_recently_observed_block(
&self,
block: U256,
balance: U256,
) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let mut guard = self.last_observed_block.write();
*guard = block;
let mut guard = self.last_observed_balance.write();
*guard = balance;
Box::new(ok(()))
}

fn load_recently_observed_data(
&self,
) -> Box<dyn Future<Item = (U256, U256), Error = ()> + Send> {
Box::new(ok((
*self.last_observed_block.read(),
*self.last_observed_balance.read(),
)))
fn load_recently_observed_block(&self) -> Box<dyn Future<Item = U256, Error = ()> + Send> {
Box::new(ok(*self.last_observed_block.read()))
}

fn load_account_id_from_address(
Expand All @@ -135,16 +126,24 @@ impl EthereumStore for TestStore {
Box::new(ok(d))
}

fn check_tx_credited(&self, tx_hash: H256) -> Box<dyn Future<Item = bool, Error = ()> + Send> {
let mut hashes = self.saved_hashes.write();
// if hash exists error, else store it
fn check_if_tx_processed(
&self,
tx_hash: H256,
) -> Box<dyn Future<Item = bool, Error = ()> + Send> {
let hashes = self.saved_hashes.read();
// if hash exists then return error
if hashes.get(&tx_hash).is_some() {
Box::new(ok(true))
} else {
(*hashes).insert(tx_hash, true);
Box::new(ok(false))
}
}

fn mark_tx_processed(&self, tx_hash: H256) -> Box<dyn Future<Item = (), Error = ()> + Send> {
let mut hashes = self.saved_hashes.write();
(*hashes).insert(tx_hash, true);
Box::new(ok(()))
}
}

impl AccountStore for TestStore {
Expand Down Expand Up @@ -235,7 +234,6 @@ impl TestStore {
address_to_id: Arc::new(RwLock::new(address_to_id)),
cache: Arc::new(RwLock::new(HashMap::new())),
cache_hits: Arc::new(RwLock::new(0)),
last_observed_balance: Arc::new(RwLock::new(U256::from(0))),
last_observed_block: Arc::new(RwLock::new(U256::from(0))),
saved_hashes: Arc::new(RwLock::new(HashMap::new())),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,15 @@ pub trait EthereumStore {
account_ids: Vec<<Self::Account as Account>::AccountId>,
) -> Box<dyn Future<Item = Vec<Addresses>, Error = ()> + Send>;

/// Saves the latest block number and account balance, up to which all
/// Saves the latest block number, up to which all
/// transactions have been communicated to the connector
fn save_recently_observed_data(
fn save_recently_observed_block(
&self,
block: U256,
balance: U256,
) -> Box<dyn Future<Item = (), Error = ()> + Send>;

/// Loads the latest saved block number and account balance
fn load_recently_observed_data(
&self,
) -> Box<dyn Future<Item = (U256, U256), Error = ()> + Send>;
/// Loads the latest saved block number
fn load_recently_observed_block(&self) -> Box<dyn Future<Item = U256, Error = ()> + Send>;

/// Retrieves the account id associated with the provided addresses pair.
/// Note that an account with the same `own_address` but different ERC20
Expand All @@ -68,9 +65,15 @@ pub trait EthereumStore {
eth_address: Addresses,
) -> Box<dyn Future<Item = <Self::Account as Account>::AccountId, Error = ()> + Send>;

/// Errors out if the transaction hash has already been stored before,
/// otherwise saves the transaction hash in the store.
fn check_tx_credited(&self, tx_hash: H256) -> Box<dyn Future<Item = bool, Error = ()> + Send>;
/// Returns true if the transaction has already been processed and saved in
/// the store.
fn check_if_tx_processed(
&self,
tx_hash: H256,
) -> Box<dyn Future<Item = bool, Error = ()> + Send>;

/// Saves the transaction hash in the store.
fn mark_tx_processed(&self, tx_hash: H256) -> Box<dyn Future<Item = (), Error = ()> + Send>;
}

/// Implement this trait for datatypes which can be used to sign an Ethereum
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use ethabi::Token;
use ethereum_tx_sign::{
web3::types::{Address, Transaction, U256},
web3::{
api::Web3,
futures::future::Future,
transports::Http,
types::{Address, BlockNumber, FilterBuilder, Transaction, H160, H256, U256},
},
RawTransaction,
};
use log::error;
use std::str::FromStr;

/// This is the result of keccak256("Transfer(address,address,to)"), which is
/// used to filter through Ethereum ERC20 Transfer events in transaction receipts.
const TRANSFER_EVENT_FILTER: &str =
"ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";

// Helper function which is used to construct an Ethereum transaction sending
// `value` tokens to `to`. The account's nonce is required since Ethereum uses
// an account based model with nonces for replay protection. If a
// `token_address` is provided, then an ERC20 transaction is created instead for
// that token. Ethereum transactions cost 21000 Gas, while ERC20 transactions
// cost at most 70k (can tighten the gas limit, but 70k is safe if the address
// is indeed an ERC20 token.
// TODO: pass it gas_price as a parameter which is calculated from `web3.eth().gas_price()`
pub fn make_tx(
to: Address,
value: U256,
nonce: U256,
token_address: Option<Address>,
) -> RawTransaction {
// `value` tokens to `to`. If a `token_address` is provided, then an ERC20
// transaction is created instead for that token. The `nonce`, `gas` and
// `gas_price` fields are set to 0 and are expected to be set with the values
// returned by the corresponding `eth_getTransactionCount`, `eth_estimateGas`,
// `eth_gasPrice` calls to an Ethereum node.
pub fn make_tx(to: Address, value: U256, token_address: Option<Address>) -> RawTransaction {
if let Some(token_address) = token_address {
// Ethereum contract transactions format:
// [transfer function selector][`to` padded ][`value` padded]
Expand All @@ -29,26 +34,97 @@ pub fn make_tx(
data.extend(ethabi::encode(&[Token::Address(to), Token::Uint(value)]));
RawTransaction {
to: Some(token_address),
nonce,
nonce: U256::from(0),
data,
gas: 70000.into(), // ERC20 transactions cost approximately 40k gas.
gas_price: 20000.into(),
gas: U256::from(0),
gas_price: U256::from(0),
value: U256::zero(),
}
} else {
// Ethereum account transaction:
// The receiver is `to`, and the data field is left empty.
RawTransaction {
to: Some(to),
nonce,
nonce: U256::from(0),
data: vec![],
gas: 21000.into(),
gas_price: 20000.into(),
gas: U256::from(0),
gas_price: U256::from(0),
value,
}
}
}

#[derive(Clone, Copy, Debug)]
pub struct ERC20Transfer {
pub tx_hash: H256,
pub from: Address,
pub to: Address,
pub amount: U256,
}

/// Filters out transactions where the `from` and `to` fields match the provides
/// addreses.
pub fn transfer_logs(
web3: Web3<Http>,
contract_address: Address,
from: Option<Address>,
to: Option<Address>,
from_block: BlockNumber,
to_block: BlockNumber,
) -> impl Future<Item = Vec<ERC20Transfer>, Error = ()> {
let from = if let Some(from) = from {
Some(vec![H256::from(from)])
} else {
None
};
let to = if let Some(to) = to {
Some(vec![H256::from(to)])
} else {
None
};

// create a filter for Transfer events from `from_block` until `to_block
// that filters all events indexed by `from` and `to`.
let filter = FilterBuilder::default()
.from_block(from_block)
.to_block(to_block)
.address(vec![contract_address])
.topics(
Some(vec![H256::from(
// keccak256("transfer(address,address,to)")
TRANSFER_EVENT_FILTER,
)]),
from,
to,
None,
)
.build();

// Make an eth_getLogs call to the Ethereum node
web3.eth()
.logs(filter)
.map_err(move |err| error!("Got error when fetching transfer logs{:?}", err))
.and_then(move |logs| {
let mut ret = Vec::new();
for log in logs {
// NOTE: From/to are indexed events.
// Amount is parsed directly from the data field.
let indexed = log.topics;
let from = H160::from(indexed[1]);
let to = H160::from(indexed[2]);
let data = log.data;
let amount = U256::from_str(&hex::encode(data.0)).unwrap();
ret.push(ERC20Transfer {
tx_hash: log.transaction_hash.unwrap(),
from,
to,
amount,
});
}
Ok(ret)
})
}

// TODO: Extend this function to inspect the data field of a
// transaction, so that it supports contract wallets such as the Gnosis Multisig
// etc. There is no need to implement any ERC20 functionality here since these
Expand All @@ -71,9 +147,8 @@ mod tests {
// https://etherscan.io/tx/0x6fd1b68f02f4201a38662647b7f09170b159faec6af4825ae509beefeb8e8130
let to = "c92be489639a9c61f517bd3b955840fa19bc9b7c".parse().unwrap();
let value = "16345785d8a0000".into();
let nonce = 1.into();
let token_address = Some("B8c77482e45F1F44dE1745F52C74426C631bDD52".into());
let tx = make_tx(to, value, nonce, token_address);
let tx = make_tx(to, value, token_address);
assert_eq!(tx.to, token_address);
assert_eq!(tx.value, U256::from(0));
assert_eq!(hex::encode(tx.data), "a9059cbb000000000000000000000000c92be489639a9c61f517bd3b955840fa19bc9b7c000000000000000000000000000000000000000000000000016345785d8a0000")
Expand All @@ -83,9 +158,8 @@ mod tests {
fn test_eth_make_tx() {
let to = "c92be489639a9c61f517bd3b955840fa19bc9b7c".parse().unwrap();
let value = "16345785d8a0000".into();
let nonce = 1.into();
let token_address = None;
let tx = make_tx(to, value, nonce, token_address);
let tx = make_tx(to, value, token_address);
assert_eq!(tx.to, Some(to));
assert_eq!(tx.value, value);
let empty: Vec<u8> = Vec::new();
Expand Down
Loading

0 comments on commit 5ba5e80

Please sign in to comment.