Skip to content

Commit

Permalink
core, cmd: add generic LRU implementation (ethereum#26162)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Aug 25, 2024
1 parent 8d7c476 commit b7bc4f7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 87 deletions.
8 changes: 4 additions & 4 deletions cmd/gc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/ethdb/leveldb"
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"github.com/XinFinOrg/XDPoSChain/common/lru"
)

var (
Expand All @@ -29,7 +29,7 @@ var (
sercureKey = []byte("secure-key-")
nWorker = runtime.NumCPU() / 2
cleanAddress = []common.Address{common.BlockSignersBinary}
cache *lru.Cache
cache *lru.Cache[common.Hash, struct{}]
finish = int32(0)
running = true
stateRoots = make(chan TrieRoot)
Expand Down Expand Up @@ -58,7 +58,7 @@ func main() {
currentHeader := core.GetHeader(lddb, head, core.GetBlockNumber(lddb, head))
tridb := trie.NewDatabase(lddb)
catchEventInterupt(db)
cache, _ = lru.New(*cacheSize)
cache = lru.NewCache[common.Hash, struct{}](*cacheSize)
go func() {
for i := uint64(1); i <= currentHeader.Number.Uint64(); i++ {
hash := core.GetCanonicalHash(lddb, i)
Expand Down Expand Up @@ -222,7 +222,7 @@ func processNodes(node StateNode, db *leveldb.Database) ([17]*StateNode, [17]*[]
}
}
}
cache.Add(commonHash, true)
cache.Add(commonHash, struct{}{})
}
return newNodes, keys, number
}
Expand Down
110 changes: 47 additions & 63 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/lru"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
Expand All @@ -52,7 +53,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
)

var (
Expand Down Expand Up @@ -140,37 +140,40 @@ type BlockChain struct {

stateCache state.Database // State database to reuse between imports (contains state cache)

bodyCache *lru.Cache // Cache for the most recent block bodies
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
blockCache *lru.Cache // Cache for the most recent entire blocks
futureBlocks *lru.Cache // future blocks are blocks added for later processing
resultProcess *lru.Cache // Cache for processed blocks
calculatingBlock *lru.Cache // Cache for processing blocks
downloadingBlock *lru.Cache // Cache for downloading blocks (avoid duplication from fetcher)
quit chan struct{} // blockchain quit channel
running int32 // running must be called atomically
// procInterrupt must be atomically called
procInterrupt int32 // interrupt signaler for block processing
bodyCache *lru.Cache[common.Hash, *types.Body] // Cache for the most recent block bodies
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue] // Cache for the most recent block bodies in RLP encoded format
blockCache *lru.Cache[common.Hash, *types.Block] // Cache for the most recent entire blocks
resultProcess *lru.Cache[common.Hash, *ResultProcessBlock] // Cache for processed blocks
calculatingBlock *lru.Cache[common.Hash, *CalculatedBlock] // Cache for processing blocks
downloadingBlock *lru.Cache[common.Hash, struct{}] // Cache for downloading blocks (avoid duplication from fetcher)
badBlocks *lru.Cache[common.Hash, *types.Header] // Bad block cache

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]

wg sync.WaitGroup // chain processing wait group for shutting down
quit chan struct{} // shutdown signal, closed in Stop.
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
processor Processor // block processor interface
validator Validator // block and state validator interface
vmConfig vm.Config

badBlocks *lru.Cache // Bad block cache
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
IPCEndpoint string
Client bind.ContractBackend // Global ipc client instance.

// Blocks hash array by block number
// cache field for tracking finality purpose, can't use for tracking block vs block relationship
blocksHashCache *lru.Cache
blocksHashCache *lru.Cache[uint64, []common.Hash]

resultTrade *lru.Cache // trades result: key - takerOrderHash, value: trades corresponding to takerOrder
rejectedOrders *lru.Cache // rejected orders: key - takerOrderHash, value: rejected orders corresponding to takerOrder
resultLendingTrade *lru.Cache
rejectedLendingItem *lru.Cache
finalizedTrade *lru.Cache // include both trades which force update to closed/liquidated by the protocol
resultTrade *lru.Cache[common.Hash, interface{}] // trades result: key - takerOrderHash, value: trades corresponding to takerOrder
rejectedOrders *lru.Cache[common.Hash, interface{}] // rejected orders: key - takerOrderHash, value: rejected orders corresponding to takerOrder
resultLendingTrade *lru.Cache[common.Hash, interface{}]
rejectedLendingItem *lru.Cache[common.Hash, interface{}]
finalizedTrade *lru.Cache[common.Hash, interface{}] // include both trades which force update to closed/liquidated by the protocol
}

// NewBlockChain returns a fully initialised block chain using information
Expand All @@ -183,47 +186,30 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
TrieTimeLimit: 5 * time.Minute,
}
}
bodyCache, _ := lru.New(bodyCacheLimit)
bodyRLPCache, _ := lru.New(bodyCacheLimit)
blockCache, _ := lru.New(blockCacheLimit)
blocksHashCache, _ := lru.New(blocksHashCacheLimit)
futureBlocks, _ := lru.New(maxFutureBlocks)
badBlocks, _ := lru.New(badBlockLimit)
resultProcess, _ := lru.New(blockCacheLimit)
preparingBlock, _ := lru.New(blockCacheLimit)
downloadingBlock, _ := lru.New(blockCacheLimit)

// for XDCx
resultTrade, _ := lru.New(tradingstate.OrderCacheLimit)
rejectedOrders, _ := lru.New(tradingstate.OrderCacheLimit)

// XDCxlending
resultLendingTrade, _ := lru.New(tradingstate.OrderCacheLimit)
rejectedLendingItem, _ := lru.New(tradingstate.OrderCacheLimit)
finalizedTrade, _ := lru.New(tradingstate.OrderCacheLimit)

bc := &BlockChain{
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
bodyRLPCache: bodyRLPCache,
blockCache: blockCache,
futureBlocks: futureBlocks,
resultProcess: resultProcess,
calculatingBlock: preparingBlock,
downloadingBlock: downloadingBlock,
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
resultProcess: lru.NewCache[common.Hash, *ResultProcessBlock](blockCacheLimit),
calculatingBlock: lru.NewCache[common.Hash, *CalculatedBlock](blockCacheLimit),
downloadingBlock: lru.NewCache[common.Hash, struct{}](blockCacheLimit),
engine: engine,
vmConfig: vmConfig,
badBlocks: badBlocks,
blocksHashCache: blocksHashCache,
resultTrade: resultTrade,
rejectedOrders: rejectedOrders,
resultLendingTrade: resultLendingTrade,
rejectedLendingItem: rejectedLendingItem,
finalizedTrade: finalizedTrade,
badBlocks: lru.NewCache[common.Hash, *types.Header](badBlockLimit),
blocksHashCache: lru.NewCache[uint64, []common.Hash](blocksHashCacheLimit),
resultTrade: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
rejectedOrders: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
resultLendingTrade: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
rejectedLendingItem: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
finalizedTrade: lru.NewCache[common.Hash, interface{}](tradingstate.OrderCacheLimit),
}
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
Expand Down Expand Up @@ -720,8 +706,7 @@ func (bc *BlockChain) Genesis() *types.Block {
func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
// Short circuit if the body's already in the cache, retrieve otherwise
if cached, ok := bc.bodyCache.Get(hash); ok {
body := cached.(*types.Body)
return body
return cached
}
body := GetBody(bc.db, hash, bc.hc.GetBlockNumber(hash))
if body == nil {
Expand All @@ -737,7 +722,7 @@ func (bc *BlockChain) GetBody(hash common.Hash) *types.Body {
func (bc *BlockChain) GetBodyRLP(hash common.Hash) rlp.RawValue {
// Short circuit if the body's already in the cache, retrieve otherwise
if cached, ok := bc.bodyRLPCache.Get(hash); ok {
return cached.(rlp.RawValue)
return cached
}
body := GetBodyRLP(bc.db, hash, bc.hc.GetBlockNumber(hash))
if len(body) == 0 {
Expand Down Expand Up @@ -794,7 +779,7 @@ func (bc *BlockChain) HasBlockAndFullState(hash common.Hash, number uint64) bool
func (bc *BlockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
// Short circuit if the block's already in the cache, retrieve otherwise
if block, ok := bc.blockCache.Get(hash); ok {
return block.(*types.Block)
return block
}
block := GetBlock(bc.db, hash, number)
if block == nil {
Expand Down Expand Up @@ -847,7 +832,7 @@ func (bc *BlockChain) GetBlocksHashCache(number uint64) []common.Hash {
cached, ok := bc.blocksHashCache.Get(number)

if ok {
return cached.([]common.Hash)
return cached
}
return nil
}
Expand Down Expand Up @@ -980,7 +965,7 @@ func (bc *BlockChain) procFutureBlocks() {
blocks := make([]*types.Block, 0, bc.futureBlocks.Len())
for _, hash := range bc.futureBlocks.Keys() {
if block, exist := bc.futureBlocks.Peek(hash); exist {
blocks = append(blocks, block.(*types.Block))
blocks = append(blocks, block)
}
}
if len(blocks) > 0 {
Expand Down Expand Up @@ -1491,7 +1476,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
for i, block := range chain {
headers[i] = block.Header()
seals[i] = verifySeals
bc.downloadingBlock.Add(block.Hash(), true)
bc.downloadingBlock.Add(block.Hash(), struct{}{})
}
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)
Expand Down Expand Up @@ -1805,11 +1790,11 @@ func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*Resu
if verifiedM2 {
if result, check := bc.resultProcess.Get(block.HashNoValidator()); check {
log.Debug("Get result block from cache ", "number", block.NumberU64(), "hash", block.Hash(), "hash no validator", block.HashNoValidator())
return result.(*ResultProcessBlock), nil
return result, nil
}
log.Debug("Not found cache prepare block ", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.HashNoValidator())
if calculatedBlock, _ := bc.calculatingBlock.Get(block.HashNoValidator()); calculatedBlock != nil {
calculatedBlock.(*CalculatedBlock).stop = true
calculatedBlock.stop = true
}
}
calculatedBlock = &CalculatedBlock{block, false}
Expand Down Expand Up @@ -2007,7 +1992,7 @@ func (bc *BlockChain) UpdateBlocksHashCache(block *types.Block) []common.Hash {
cached, ok := bc.blocksHashCache.Get(blockNumber)

if ok {
hashArr := cached.([]common.Hash)
hashArr := cached
hashArr = append(hashArr, block.Hash())
bc.blocksHashCache.Remove(blockNumber)
bc.blocksHashCache.Add(blockNumber, hashArr)
Expand Down Expand Up @@ -2340,8 +2325,7 @@ type BadBlockArgs struct {
func (bc *BlockChain) BadBlocks() ([]BadBlockArgs, error) {
headers := make([]BadBlockArgs, 0, bc.badBlocks.Len())
for _, hash := range bc.badBlocks.Keys() {
if hdr, exist := bc.badBlocks.Peek(hash); exist {
header := hdr.(*types.Header)
if header, exist := bc.badBlocks.Peek(hash); exist {
headers = append(headers, BadBlockArgs{header.Hash(), header})
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,7 +1367,7 @@ func TestBlocksHashCacheUpdate(t *testing.T) {
}
cachedAt, _ := chain.blocksHashCache.Get(uint64(3))

if len(cachedAt.([]common.Hash)) != 2 {
if len(cachedAt) != 2 {
t.Error("BlocksHashCache doesn't add new cache after concating new fork ")
}
})
Expand Down
24 changes: 10 additions & 14 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
"time"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/lru"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
lru "github.com/hashicorp/golang-lru"
)

const (
Expand All @@ -56,9 +56,9 @@ type HeaderChain struct {
currentHeader atomic.Value // Current head of the header chain (may be above the block chain!)
currentHeaderHash common.Hash // Hash of the current head of the header chain (prevent recomputing all the time)

headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties
numberCache *lru.Cache // Cache for the most recent block numbers
headerCache *lru.Cache[common.Hash, *types.Header]
tdCache *lru.Cache[common.Hash, *big.Int] // most recent total difficulties
numberCache *lru.Cache[common.Hash, uint64] // most recent block numbers

procInterrupt func() bool

Expand All @@ -72,10 +72,6 @@ type HeaderChain struct {
// procInterrupt points to the parent's interrupt semaphore
// wg points to the parent's shutdown wait group
func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) {
headerCache, _ := lru.New(headerCacheLimit)
tdCache, _ := lru.New(tdCacheLimit)
numberCache, _ := lru.New(numberCacheLimit)

// Seed a fast but crypto originating random generator
seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
Expand All @@ -85,9 +81,9 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
hc := &HeaderChain{
config: config,
chainDb: chainDb,
headerCache: headerCache,
tdCache: tdCache,
numberCache: numberCache,
headerCache: lru.NewCache[common.Hash, *types.Header](headerCacheLimit),
tdCache: lru.NewCache[common.Hash, *big.Int](tdCacheLimit),
numberCache: lru.NewCache[common.Hash, uint64](numberCacheLimit),
procInterrupt: procInterrupt,
rand: mrand.New(mrand.NewSource(seed.Int64())),
engine: engine,
Expand All @@ -113,7 +109,7 @@ func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine c
// from the cache or database
func (hc *HeaderChain) GetBlockNumber(hash common.Hash) uint64 {
if cached, ok := hc.numberCache.Get(hash); ok {
return cached.(uint64)
return cached
}
number := GetBlockNumber(hc.chainDb, hash)
if number != missingNumber {
Expand Down Expand Up @@ -312,7 +308,7 @@ func (hc *HeaderChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []co
func (hc *HeaderChain) GetTd(hash common.Hash, number uint64) *big.Int {
// Short circuit if the td's already in the cache, retrieve otherwise
if cached, ok := hc.tdCache.Get(hash); ok {
return cached.(*big.Int)
return cached
}
td := GetTd(hc.chainDb, hash, number)
if td == nil {
Expand Down Expand Up @@ -344,7 +340,7 @@ func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) err
func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header {
// Short circuit if the header's already in the cache, retrieve otherwise
if header, ok := hc.headerCache.Get(hash); ok {
return header.(*types.Header)
return header
}
header := GetHeader(hc.chainDb, hash, number)
if header == nil {
Expand Down
9 changes: 4 additions & 5 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"fmt"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/lru"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
)

const (
Expand Down Expand Up @@ -107,16 +107,15 @@ func NewDatabase(db ethdb.Database) Database {
// is safe for concurrent use and retains a lot of collapsed RLP trie nodes in a
// large memory cache.
func NewDatabaseWithCache(db ethdb.Database, cache int) Database {
csc, _ := lru.New(codeSizeCacheSize)
return &cachingDB{
db: trie.NewDatabaseWithCache(db, cache),
codeSizeCache: csc,
codeSizeCache: lru.NewCache[common.Hash, int](codeSizeCacheSize),
}
}

type cachingDB struct {
db *trie.Database
codeSizeCache *lru.Cache
codeSizeCache *lru.Cache[common.Hash, int]
}

// OpenTrie opens the main account trie at a specific root hash.
Expand Down Expand Up @@ -151,7 +150,7 @@ func (db *cachingDB) ContractCode(addrHash, codeHash common.Hash) ([]byte, error
// ContractCodeSize retrieves a particular contracts code's size.
func (db *cachingDB) ContractCodeSize(addrHash, codeHash common.Hash) (int, error) {
if cached, ok := db.codeSizeCache.Get(codeHash); ok {
return cached.(int), nil
return cached, nil
}
code, err := db.ContractCode(addrHash, codeHash)
return len(code), err
Expand Down

0 comments on commit b7bc4f7

Please sign in to comment.