Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch out std mutexes to parking lot and spin mutexes #2512

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions lib/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ rocksdb = { version = "0.21", default-features = false }
statrs = "0.16"
rustc-hex = "2.1"
rustc_version_runtime = "0.2.1"
parking_lot = "0.12.1"
spin = "0.9.8"

### eth

Expand Down
2 changes: 2 additions & 0 deletions lib/ain-evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ serde_json.workspace = true
statrs.workspace = true
rustc-hex.workspace = true
ethabi.workspace = true
parking_lot.workspace = true
spin.workspace = true

# Trie dependencies
hash-db.workspace = true
Expand Down
36 changes: 19 additions & 17 deletions lib/ain-evm/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
path::PathBuf,
sync::{Arc, Mutex},
sync::Arc,
};

use parking_lot::Mutex;

use ain_contracts::{
dst20_address_from_token_id, get_transfer_domain_contract,
get_transferdomain_dst20_transfer_function, get_transferdomain_native_transfer_function,
Expand Down Expand Up @@ -36,7 +38,7 @@ use crate::{
pub type XHash = String;

pub struct SignedTxCache {
inner: Mutex<LruCache<String, SignedTx>>,
inner: spin::Mutex<LruCache<String, SignedTx>>,
}

const DEFAULT_CACHE_SIZE: usize = 10000;
Expand All @@ -50,12 +52,12 @@ impl Default for SignedTxCache {
impl SignedTxCache {
pub fn new(capacity: usize) -> Self {
Self {
inner: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
inner: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
}
}

pub fn try_get_or_create(&self, key: &str) -> Result<SignedTx> {
let mut guard = self.inner.lock().unwrap();
let mut guard = self.inner.lock();
debug!("[signed-tx-cache]::get: {}", key);
let res = guard.try_get_or_insert(key.to_string(), || {
debug!("[signed-tx-cache]::create {}", key);
Expand All @@ -67,7 +69,7 @@ impl SignedTxCache {
pub fn try_get_or_create_from_tx(&self, tx: &TransactionV2) -> Result<SignedTx> {
let data = EnvelopedEncodable::encode(tx);
let key = hex::encode(&data);
let mut guard = self.inner.lock().unwrap();
let mut guard = self.inner.lock();
debug!("[signed-tx-cache]::get from tx: {}", &key);
let res = guard.try_get_or_insert(key.clone(), || {
debug!("[signed-tx-cache]::create from tx {}", &key);
Expand All @@ -78,8 +80,8 @@ impl SignedTxCache {
}

struct TxValidationCache {
validated: Mutex<LruCache<(U256, H256, String, bool), ValidateTxInfo>>,
stateless: Mutex<LruCache<String, ValidateTxInfo>>,
validated: spin::Mutex<LruCache<(U256, H256, String, bool), ValidateTxInfo>>,
stateless: spin::Mutex<LruCache<String, ValidateTxInfo>>,
}

impl Default for TxValidationCache {
Expand All @@ -91,35 +93,35 @@ impl Default for TxValidationCache {
impl TxValidationCache {
pub fn new(capacity: usize) -> Self {
Self {
validated: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
stateless: Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
validated: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
stateless: spin::Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())),
}
}

pub fn get(&self, key: &(U256, H256, String, bool)) -> Option<ValidateTxInfo> {
self.validated.lock().unwrap().get(key).cloned()
self.validated.lock().get(key).cloned()
}

pub fn get_stateless(&self, key: &str) -> Option<ValidateTxInfo> {
self.stateless.lock().unwrap().get(key).cloned()
self.stateless.lock().get(key).cloned()
}

pub fn set(&self, key: (U256, H256, String, bool), value: ValidateTxInfo) -> ValidateTxInfo {
let mut cache = self.validated.lock().unwrap();
let mut cache = self.validated.lock();
cache.put(key, value.clone());
value
}

pub fn set_stateless(&self, key: String, value: ValidateTxInfo) -> ValidateTxInfo {
let mut cache = self.stateless.lock().unwrap();
let mut cache = self.stateless.lock();
cache.put(key, value.clone());
value
}

// To be used on new block or any known state changes. Only clears fully validated TX cache.
// Stateless cache can be kept across blocks and is handled by LRU itself
pub fn clear(&self) {
let mut cache = self.validated.lock().unwrap();
let mut cache = self.validated.lock();
cache.clear()
}
}
Expand Down Expand Up @@ -1011,7 +1013,7 @@ impl EVMCoreService {

pub fn get_next_account_nonce(&self, address: H160, state_root: H256) -> Result<U256> {
let state_root_nonce = self.get_nonce_from_state_root(address, state_root)?;
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
match nonce_store.entry(address) {
std::collections::hash_map::Entry::Vacant(_) => Ok(state_root_nonce),
std::collections::hash_map::Entry::Occupied(e) => {
Expand All @@ -1035,7 +1037,7 @@ impl EVMCoreService {
}

pub fn store_account_nonce(&self, address: H160, nonce: U256) -> bool {
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
nonce_store.entry(address).or_default();

match nonce_store.entry(address) {
Expand All @@ -1048,7 +1050,7 @@ impl EVMCoreService {
}

pub fn clear_account_nonce(&self) {
let mut nonce_store = self.nonce_store.lock().unwrap();
let mut nonce_store = self.nonce_store.lock();
nonce_store.clear()
}
}
4 changes: 2 additions & 2 deletions lib/ain-evm/src/evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl EVMServices {
mnview_ptr: usize,
) -> Result<FinalizedBlockInfo> {
let tx_queue = self.core.tx_queues.get(queue_id)?;
let mut queue = tx_queue.data.lock().unwrap();
let mut queue = tx_queue.data.lock();

let queue_txs_len = queue.transactions.len();
let mut all_transactions = Vec::with_capacity(queue_txs_len);
Expand Down Expand Up @@ -376,7 +376,7 @@ impl EVMServices {
pub unsafe fn commit_queue(&self, queue_id: u64) -> Result<()> {
{
let tx_queue = self.core.tx_queues.get(queue_id)?;
let queue = tx_queue.data.lock().unwrap();
let queue = tx_queue.data.lock();
let Some(BlockData { block, receipts }) = queue.block_data.clone() else {
return Err(format_err!("no constructed EVM block exist in queue id").into());
};
Expand Down
7 changes: 4 additions & 3 deletions lib/ain-evm/src/services.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
Arc,
},
thread::{self, JoinHandle},
};

use parking_lot::Mutex;

use anyhow::Result;
use jsonrpsee_server::ServerHandle as HttpServerHandle;
use tokio::{
Expand Down Expand Up @@ -68,7 +70,7 @@ impl Services {
}

pub fn stop_network(&self) -> Result<()> {
let mut json_rpc_handle = self.json_rpc.lock().unwrap();
let mut json_rpc_handle = self.json_rpc.lock();
if (json_rpc_handle).is_none() {
// Server was never started
return Ok(());
Expand All @@ -88,7 +90,6 @@ impl Services {

self.tokio_worker
.lock()
.unwrap()
.take()
.expect("runtime terminated?")
.join()
Expand Down
28 changes: 12 additions & 16 deletions lib/ain-evm/src/txqueue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
};
use std::{collections::HashMap, sync::Arc};

use parking_lot::{Mutex, RwLock};

use ethereum::{Block, TransactionV2};
use ethereum_types::{H256, U256};
Expand Down Expand Up @@ -52,7 +51,7 @@ impl TransactionQueueMap {
if queue_id == 0 {
continue;
};
let mut write_guard = self.queues.write().unwrap();
let mut write_guard = self.queues.write();

if let std::collections::hash_map::Entry::Vacant(e) = write_guard.entry(queue_id) {
e.insert(Arc::new(TransactionQueue::new(target_block, state_root)));
Expand All @@ -70,7 +69,7 @@ impl TransactionQueueMap {
/// across all usages. Note: To be replaced with a proper lock flow later.
///
pub unsafe fn remove(&self, queue_id: u64) -> Option<Arc<TransactionQueue>> {
self.queues.write().unwrap().remove(&queue_id)
self.queues.write().remove(&queue_id)
}

/// Returns an atomic reference counting pointer of the `TransactionQueue` associated with the provided queue ID.
Expand All @@ -91,7 +90,6 @@ impl TransactionQueueMap {
Ok(Arc::clone(
self.queues
.read()
.unwrap()
.get(&queue_id)
.ok_or(QueueError::NoSuchQueue)?,
))
Expand Down Expand Up @@ -193,7 +191,7 @@ impl TransactionQueueMap {
where
F: FnOnce(&TransactionQueue) -> T,
{
match self.queues.read().unwrap().get(&queue_id) {
match self.queues.read().get(&queue_id) {
Some(queue) => Ok(f(queue)),
None => Err(QueueError::NoSuchQueue),
}
Expand Down Expand Up @@ -268,7 +266,7 @@ impl TransactionQueue {
gas_used: U256,
state_root: H256,
) -> Result<()> {
let mut data = self.data.lock().unwrap();
let mut data = self.data.lock();

data.total_gas_used += gas_used;

Expand All @@ -282,7 +280,7 @@ impl TransactionQueue {
}

pub fn remove_txs_above_hash(&self, target_hash: XHash) -> Result<Vec<XHash>> {
let mut data = self.data.lock().unwrap();
let mut data = self.data.lock();
let mut removed_txs = Vec::new();

if let Some(index) = data
Expand All @@ -306,29 +304,28 @@ impl TransactionQueue {
}

pub fn get_queue_txs_cloned(&self) -> Vec<QueueTxItem> {
self.data.lock().unwrap().transactions.clone()
self.data.lock().transactions.clone()
}

pub fn get_total_gas_used(&self) -> U256 {
self.data.lock().unwrap().total_gas_used
self.data.lock().total_gas_used
}

pub fn get_target_block(&self) -> U256 {
self.data.lock().unwrap().target_block
self.data.lock().target_block
}

pub fn get_state_root_from_native_hash(&self, hash: XHash) -> Option<H256> {
self.data
.lock()
.unwrap()
.transactions
.iter()
.find(|tx_item| tx_item.tx_hash == hash)
.map(|tx_item| tx_item.state_root)
}

pub fn get_latest_state_root(&self) -> H256 {
let data = self.data.lock().unwrap();
let data = self.data.lock();
data.transactions
.last()
.map_or(data.initial_state_root, |tx_item| tx_item.state_root)
Expand All @@ -337,7 +334,6 @@ impl TransactionQueue {
pub fn is_queued(&self, tx: &QueueTx) -> bool {
self.data
.lock()
.unwrap()
.transactions
.iter()
.any(|queued| &queued.tx == tx)
Expand Down
2 changes: 1 addition & 1 deletion lib/ain-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn init_network_json_rpc_service(runtime: &Services, addr: &str) -> Result<(
methods.merge(MetachainNetRPCModule::new(Arc::clone(&runtime.evm)).into_rpc())?;
methods.merge(MetachainWeb3RPCModule::new(Arc::clone(&runtime.evm)).into_rpc())?;

*runtime.json_rpc.lock().unwrap() = Some(server.start(methods)?);
*runtime.json_rpc.lock() = Some(server.start(methods)?);
Ok(())
}

Expand Down