From 3f60e5b215b16d8b73918544060cff9d8de8748f Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Sun, 14 Jul 2019 10:36:18 +0800 Subject: [PATCH 1/3] core: fix write coucurrency in txpool --- core/tx_noncer.go | 26 ++++++++++++++++++++++++++ core/tx_pool.go | 15 +++------------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/core/tx_noncer.go b/core/tx_noncer.go index 98a78e087e2a..3a9c48ea1497 100644 --- a/core/tx_noncer.go +++ b/core/tx_noncer.go @@ -17,6 +17,8 @@ package core import ( + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" ) @@ -27,6 +29,7 @@ import ( type txNoncer struct { fallback *state.StateDB nonces map[common.Address]uint64 + lock sync.Mutex } // newTxNoncer creates a new virtual state database to track the pool nonces. @@ -40,6 +43,11 @@ func newTxNoncer(statedb *state.StateDB) *txNoncer { // get returns the current nonce of an account, falling back to a real state // database if the account is unknown. func (txn *txNoncer) get(addr common.Address) uint64 { + // We use mutex for get operation is the underlying + // state will mutate db even for read access. + txn.lock.Lock() + defer txn.lock.Unlock() + if _, ok := txn.nonces[addr]; !ok { txn.nonces[addr] = txn.fallback.GetNonce(addr) } @@ -49,5 +57,23 @@ func (txn *txNoncer) get(addr common.Address) uint64 { // set inserts a new virtual nonce into the virtual state database to be returned // whenever the pool requests it instead of reaching into the real state database. func (txn *txNoncer) set(addr common.Address, nonce uint64) { + txn.lock.Lock() + defer txn.lock.Unlock() + + txn.nonces[addr] = nonce +} + +// compareAndSet inserts or updates a new virtual nonce into the virtual state +// database if the compare callback is true. +func (txn *txNoncer) compareAndSet(addr common.Address, nonce uint64, compare func(uint64, uint64) bool) { + txn.lock.Lock() + defer txn.lock.Unlock() + + if _, ok := txn.nonces[addr]; !ok { + txn.nonces[addr] = txn.fallback.GetNonce(addr) + } + if compare == nil || !compare(txn.nonces[addr], nonce) { + return + } txn.nonces[addr] = nonce } diff --git a/core/tx_pool.go b/core/tx_pool.go index 43caf16b18ed..f54ab47417c9 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -420,9 +420,6 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (pool *TxPool) Nonce(addr common.Address) uint64 { - pool.mu.RLock() - defer pool.mu.RUnlock() - return pool.pendingNonces.get(addr) } @@ -854,9 +851,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.enqueueTx(tx.Hash(), tx) } // Update the account nonce if needed - if nonce := tx.Nonce(); pool.pendingNonces.get(addr) > nonce { - pool.pendingNonces.set(addr, nonce) - } + pool.pendingNonces.compareAndSet(addr, tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) // Reduce the pending counter pendingCounter.Dec(int64(1 + len(invalids))) return @@ -1232,9 +1227,7 @@ func (pool *TxPool) truncatePending() { pool.all.Remove(hash) // Update the account nonce to the dropped transaction - if nonce := tx.Nonce(); pool.pendingNonces.get(offenders[i]) > nonce { - pool.pendingNonces.set(offenders[i], nonce) - } + pool.pendingNonces.compareAndSet(offenders[i], tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) @@ -1261,9 +1254,7 @@ func (pool *TxPool) truncatePending() { pool.all.Remove(hash) // Update the account nonce to the dropped transaction - if nonce := tx.Nonce(); pool.pendingNonces.get(addr) > nonce { - pool.pendingNonces.set(addr, nonce) - } + pool.pendingNonces.compareAndSet(addr, tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) From b49f3bf2c5b5284f713c4ceaf189ee4978c150a1 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 16 Jul 2019 17:01:28 +0800 Subject: [PATCH 2/3] core: add rlock for pendingState read access --- core/tx_pool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/tx_pool.go b/core/tx_pool.go index f54ab47417c9..6bcb97b0c6c8 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -420,6 +420,9 @@ func (pool *TxPool) SetGasPrice(price *big.Int) { // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (pool *TxPool) Nonce(addr common.Address) uint64 { + pool.mu.RLock() + defer pool.mu.RUnlock() + return pool.pendingNonces.get(addr) } From 0baea4d7be71cfa98abac26361caff3e3b8a9984 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 17 Jul 2019 16:35:45 +0800 Subject: [PATCH 3/3] core: address comments --- core/tx_noncer.go | 8 ++++---- core/tx_pool.go | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/tx_noncer.go b/core/tx_noncer.go index 3a9c48ea1497..aa87c643aee2 100644 --- a/core/tx_noncer.go +++ b/core/tx_noncer.go @@ -63,16 +63,16 @@ func (txn *txNoncer) set(addr common.Address, nonce uint64) { txn.nonces[addr] = nonce } -// compareAndSet inserts or updates a new virtual nonce into the virtual state -// database if the compare callback is true. -func (txn *txNoncer) compareAndSet(addr common.Address, nonce uint64, compare func(uint64, uint64) bool) { +// setIfLower updates a new virtual nonce into the virtual state database if the +// the new one is lower. +func (txn *txNoncer) setIfLower(addr common.Address, nonce uint64) { txn.lock.Lock() defer txn.lock.Unlock() if _, ok := txn.nonces[addr]; !ok { txn.nonces[addr] = txn.fallback.GetNonce(addr) } - if compare == nil || !compare(txn.nonces[addr], nonce) { + if txn.nonces[addr] <= nonce { return } txn.nonces[addr] = nonce diff --git a/core/tx_pool.go b/core/tx_pool.go index 6bcb97b0c6c8..c41d3fbd4a2c 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -854,7 +854,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { pool.enqueueTx(tx.Hash(), tx) } // Update the account nonce if needed - pool.pendingNonces.compareAndSet(addr, tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) + pool.pendingNonces.setIfLower(addr, tx.Nonce()) // Reduce the pending counter pendingCounter.Dec(int64(1 + len(invalids))) return @@ -1230,7 +1230,7 @@ func (pool *TxPool) truncatePending() { pool.all.Remove(hash) // Update the account nonce to the dropped transaction - pool.pendingNonces.compareAndSet(offenders[i], tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) + pool.pendingNonces.setIfLower(offenders[i], tx.Nonce()) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) @@ -1257,7 +1257,7 @@ func (pool *TxPool) truncatePending() { pool.all.Remove(hash) // Update the account nonce to the dropped transaction - pool.pendingNonces.compareAndSet(addr, tx.Nonce(), func(old uint64, new uint64) bool { return old > new }) + pool.pendingNonces.setIfLower(addr, tx.Nonce()) log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps))