Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade tx pool #528

Merged
merged 23 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
702e0ca
core: txpool stable underprice drop order, perf fixes (#16494)
gzliudan Apr 30, 2024
2060ff7
core: Ensure that local transactions aren't discarded as underpriced …
gzliudan May 7, 2024
88695ca
core: use a wrapped map w/ sync.RWMutex for TxPool.all to remove cont…
gzliudan Apr 30, 2024
fb89a54
core: fix transaction event asynchronicity (#16843)
gzliudan May 7, 2024
4f0f6e0
core: concurrent background transaction sender ecrecover (#16882)
gzliudan May 7, 2024
859308c
core: change comment to match code more closely (#16963)
gzliudan May 8, 2024
5b883de
core/tx_pool: reduce judgement levels (#16980)
gzliudan May 8, 2024
241201c
core: fixed typo (#17214)
gzliudan May 8, 2024
41b29a8
Fixed typo addresssByHeartbeat -> addressesByHeartbeat (#17243)
gzliudan May 8, 2024
6c657ef
core: priority mining (#17472)
gzliudan May 8, 2024
ec50ca3
core, eth, trie: use common/prque (#17508)
gzliudan May 9, 2024
a666465
core: fix a typo (#17733)
gzliudan May 9, 2024
6b87c07
core: fix comment typo (#18144)
gzliudan May 9, 2024
676c4e8
core: sanitize more TxPoolConfig fields (#17210)
gzliudan May 9, 2024
c5b22fb
core: make txpool handle reorg due to setHead (#19308)
gzliudan May 9, 2024
65baaaa
core: cache tx signature before obtaining lock (#19351)
gzliudan May 10, 2024
ddbf5d2
core: expose various counter metrics for grafana (#19692)
gzliudan May 10, 2024
74c7236
core: move TxPool reorg and events to background goroutine (#19705)
gzliudan May 10, 2024
6338a41
core: kill off managed state, use own tiny noncer for txpool (#19810)
gzliudan May 10, 2024
edaed4f
core: fix write concurrency in txpool (#19835)
gzliudan May 10, 2024
b708614
core, les: fix les unit tests (#19823)
gzliudan May 10, 2024
67825d8
core, light, params: implement eip2028 (#19931)
gzliudan May 10, 2024
742a7f9
core, metrics: switch some invalid counters to gauges (#20047)
gzliudan May 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions XDCx/XDCx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/syncmap"
Expand Down Expand Up @@ -105,7 +104,7 @@ func New(cfg *Config) *XDCX {
}
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
tokenDecimalCache: tokenDecimalCache,
orderCache: orderCache,
}
Expand Down
11 changes: 5 additions & 6 deletions XDCxlending/XDCxlending.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func New(XDCx *XDCx.XDCX) *Lending {
lendingTradeCache, _ := lru.New(defaultCacheLimit)
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
lendingItemHistory: itemCache,
lendingTradeHistory: lendingTradeCache,
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/XDPoS/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

type Masternode struct {
Expand Down
2 changes: 1 addition & 1 deletion contracts/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *core.TxPool, m
}

// Create and send tx to smart contract for sign validate block.
nonce := pool.State().GetNonce(account.Address)
nonce := pool.Nonce(account.Address)
tx := CreateTxSign(block.Number(), block.Hash(), nonce, common.HexToAddress(common.BlockSigners))
txSigned, err := wallet.SignTx(account, tx, chainConfig.ChainId)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func genValueTx(nbytes int) func(int, *BlockGen) {
return func(i int, gen *BlockGen) {
toaddr := common.Address{}
data := make([]byte, nbytes)
gas, _ := IntrinsicGas(data, false, false)
gas, _ := IntrinsicGas(data, false, false, false)
tx, _ := types.SignTx(types.NewTransaction(gen.TxNonce(benchRootAddr), toaddr, big.NewInt(1), gas, nil, data), types.HomesteadSigner{}, benchRootKey)
gen.AddTx(tx)
}
Expand Down
21 changes: 13 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
Expand All @@ -53,7 +52,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -201,7 +199,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
Expand Down Expand Up @@ -1268,18 +1266,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))
if tradingTrieDb != nil {
tradingTrieDb.Reference(tradingRoot, common.Hash{})
}
if tradingService != nil {
tradingService.GetTriegc().Push(tradingRoot, -float32(block.NumberU64()))
tradingService.GetTriegc().Push(tradingRoot, -int64(block.NumberU64()))
}
if lendingTrieDb != nil {
lendingTrieDb.Reference(lendingRoot, common.Hash{})
}
if lendingService != nil {
lendingService.GetTriegc().Push(lendingRoot, -float32(block.NumberU64()))
lendingService.GetTriegc().Push(lendingRoot, -int64(block.NumberU64()))
}
if current := block.NumberU64(); current > triesInMemory {
// Find the next state trie we need to commit
Expand Down Expand Up @@ -1450,6 +1448,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)

// Do a sanity check that the provided chain is actually ordered and linked
Expand Down Expand Up @@ -1491,6 +1493,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)

// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks
Expand Down
36 changes: 17 additions & 19 deletions core/lending_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"sync"
"time"

"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -671,7 +669,7 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Debug("Discarding invalid lending transaction", "hash", hash, "userAddress", tx.UserAddress, "status", tx.Status, "err", err)
invalidTxCounter.Inc(1)
invalidTxMeter.Mark(1)
return false, err
}
from, _ := types.LendingSender(pool.signer, tx) // already validated
Expand All @@ -685,12 +683,12 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx)
if !inserted {
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
pool.all[tx.Hash()] = tx
pool.journalTx(from, tx)
Expand Down Expand Up @@ -726,13 +724,13 @@ func (pool *LendingPool) enqueueTx(hash common.Hash, tx *types.LendingTransactio
inserted, old := pool.queue[from].Add(tx)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
queuedReplaceCounter.Inc(1)
queuedReplaceMeter.Mark(1)
}
pool.all[hash] = tx
return old != nil, nil
Expand Down Expand Up @@ -764,13 +762,13 @@ func (pool *LendingPool) promoteTx(addr common.Address, hash common.Hash, tx *ty
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[hash] == nil {
Expand Down Expand Up @@ -981,7 +979,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
hash := tx.Hash()
delete(pool.all, hash)

queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
Expand All @@ -998,11 +996,11 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
Expand Down Expand Up @@ -1057,7 +1055,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
Expand All @@ -1066,7 +1064,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand All @@ -1087,15 +1085,15 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash())
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
}
}
}
Expand Down
Loading