Skip to content

Commit

Permalink
Merge pull request #528 from gzliudan/tx-pool
Browse files Browse the repository at this point in the history
upgrade tx pool
  • Loading branch information
gzliudan authored May 10, 2024
2 parents bfa3bb4 + 742a7f9 commit 24fa7e3
Show file tree
Hide file tree
Showing 25 changed files with 1,305 additions and 978 deletions.
11 changes: 5 additions & 6 deletions XDCx/XDCx.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/syncmap"
Expand Down Expand Up @@ -105,7 +104,7 @@ func New(cfg *Config) *XDCX {
}
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
tokenDecimalCache: tokenDecimalCache,
orderCache: orderCache,
}
Expand Down
11 changes: 5 additions & 6 deletions XDCxlending/XDCxlending.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func New(XDCx *XDCx.XDCX) *Lending {
lendingTradeCache, _ := lru.New(defaultCacheLimit)
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
lendingItemHistory: itemCache,
lendingTradeHistory: lendingTradeCache,
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/XDPoS/utils/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

type Masternode struct {
Expand Down
2 changes: 1 addition & 1 deletion contracts/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *core.TxPool, m
}

// Create and send tx to smart contract for sign validate block.
nonce := pool.State().GetNonce(account.Address)
nonce := pool.Nonce(account.Address)
tx := CreateTxSign(block.Number(), block.Hash(), nonce, common.HexToAddress(common.BlockSigners))
txSigned, err := wallet.SignTx(account, tx, chainConfig.ChainId)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func genValueTx(nbytes int) func(int, *BlockGen) {
return func(i int, gen *BlockGen) {
toaddr := common.Address{}
data := make([]byte, nbytes)
gas, _ := IntrinsicGas(data, false, false)
gas, _ := IntrinsicGas(data, false, false, false)
tx, _ := types.SignTx(types.NewTransaction(gen.TxNonce(benchRootAddr), toaddr, big.NewInt(1), gas, nil, data), types.HomesteadSigner{}, benchRootKey)
gen.AddTx(tx)
}
Expand Down
21 changes: 13 additions & 8 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ import (
"sync/atomic"
"time"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"

"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
Expand All @@ -53,7 +52,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -201,7 +199,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
Expand Down Expand Up @@ -1268,18 +1266,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))
if tradingTrieDb != nil {
tradingTrieDb.Reference(tradingRoot, common.Hash{})
}
if tradingService != nil {
tradingService.GetTriegc().Push(tradingRoot, -float32(block.NumberU64()))
tradingService.GetTriegc().Push(tradingRoot, -int64(block.NumberU64()))
}
if lendingTrieDb != nil {
lendingTrieDb.Reference(lendingRoot, common.Hash{})
}
if lendingService != nil {
lendingService.GetTriegc().Push(lendingRoot, -float32(block.NumberU64()))
lendingService.GetTriegc().Push(lendingRoot, -int64(block.NumberU64()))
}
if current := block.NumberU64(); current > triesInMemory {
// Find the next state trie we need to commit
Expand Down Expand Up @@ -1450,6 +1448,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)

// Do a sanity check that the provided chain is actually ordered and linked
Expand Down Expand Up @@ -1491,6 +1493,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)

// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)

// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks
Expand Down
36 changes: 17 additions & 19 deletions core/lending_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"sync"
"time"

"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"

"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)

var (
Expand Down Expand Up @@ -671,7 +669,7 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Debug("Discarding invalid lending transaction", "hash", hash, "userAddress", tx.UserAddress, "status", tx.Status, "err", err)
invalidTxCounter.Inc(1)
invalidTxMeter.Mark(1)
return false, err
}
from, _ := types.LendingSender(pool.signer, tx) // already validated
Expand All @@ -685,12 +683,12 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx)
if !inserted {
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
pool.all[tx.Hash()] = tx
pool.journalTx(from, tx)
Expand Down Expand Up @@ -726,13 +724,13 @@ func (pool *LendingPool) enqueueTx(hash common.Hash, tx *types.LendingTransactio
inserted, old := pool.queue[from].Add(tx)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
queuedReplaceCounter.Inc(1)
queuedReplaceMeter.Mark(1)
}
pool.all[hash] = tx
return old != nil, nil
Expand Down Expand Up @@ -764,13 +762,13 @@ func (pool *LendingPool) promoteTx(addr common.Address, hash common.Hash, tx *ty
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[hash] == nil {
Expand Down Expand Up @@ -981,7 +979,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
hash := tx.Hash()
delete(pool.all, hash)

queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
Expand All @@ -998,11 +996,11 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
Expand Down Expand Up @@ -1057,7 +1055,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
Expand All @@ -1066,7 +1064,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand All @@ -1087,15 +1085,15 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash())
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
txs := list.Flatten()
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
}
}
}
Expand Down
Loading

0 comments on commit 24fa7e3

Please sign in to comment.