From 5640be0bf56836fdecab672c65dafd2195f3302b Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Mon, 18 Nov 2024 14:34:50 -0800 Subject: [PATCH] handle passing of concurrency --- core/blockchain.go | 9 ++++++--- core/state/prefetcher_database.go | 26 +++++++++++++++++--------- core/state/statedb.go | 11 ++--------- core/state/trie_prefetcher.go | 3 +++ miner/worker.go | 2 +- 5 files changed, 29 insertions(+), 22 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index db3a49bdb0..61044cb87d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1316,7 +1316,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error { blockStateInitTimer.Inc(time.Since(substart).Milliseconds()) // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) + statedb.StartPrefetcher("chain") activeState = statedb // Process block using the parent state as reference point @@ -1675,7 +1675,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block) } // Enable prefetching to pull in trie node paths while processing transactions - statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism) + statedb.StartPrefetcher("chain") defer func() { statedb.StopPrefetcher() }() @@ -2073,7 +2073,10 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error { bc.hc.SetCurrentHeader(block.Header()) lastAcceptedHash := block.Hash() - bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb) + bc.stateCache = state.WithPrefetcher( + state.NewDatabaseWithNodeDB(bc.db, bc.triedb), + bc.cacheConfig.TriePrefetcherParallelism, + ) if err := bc.loadLastState(lastAcceptedHash); err != nil { return err diff --git a/core/state/prefetcher_database.go b/core/state/prefetcher_database.go index a51e9f22cd..8c49abba10 100644 --- a/core/state/prefetcher_database.go +++ b/core/state/prefetcher_database.go @@ -12,12 +12,21 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// ForPrefetchingOnly returns a new database that is only suitable for prefetching -// operations. It will not be safe to use for any other operations. -// Close must be called on the returned database when it is no longer needed -// to wait on all spawned goroutines. -func (*cachingDB) ForPrefetchingOnly(db Database, maxConcurrency int) Database { - return newPrefetcherDatabase(db, maxConcurrency) +type withPrefetcherDB interface { + PrefetcherDB() Database +} + +type withPrefetcher struct { + Database + maxConcurrency int +} + +func (db *withPrefetcher) PrefetcherDB() Database { + return newPrefetcherDatabase(db.Database, db.maxConcurrency) +} + +func WithPrefetcher(db Database, maxConcurrency int) Database { + return &withPrefetcher{db, maxConcurrency} } type prefetcherDatabase struct { @@ -29,9 +38,8 @@ type prefetcherDatabase struct { func newPrefetcherDatabase(db Database, maxConcurrency int) *prefetcherDatabase { return &prefetcherDatabase{ - Database: db, - maxConcurrency: maxConcurrency, - workers: utils.NewBoundedWorkers(maxConcurrency), + Database: db, + workers: utils.NewBoundedWorkers(maxConcurrency), } } diff --git a/core/state/statedb.go b/core/state/statedb.go index 9b9aafc75c..cd759e1bf3 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -204,20 +204,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St // StartPrefetcher initializes a new trie prefetcher to pull in nodes from the // state trie concurrently while the state is mutated so that when we reach the // commit phase, most of the needed data is already hot. -func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) { +func (s *StateDB) StartPrefetcher(namespace string) { if s.prefetcher != nil { s.prefetcher.close() s.prefetcher = nil } if s.snap != nil { - db := s.db - type prefetchingDB interface { - ForPrefetchingOnly(db Database, maxConcurrency int) Database - } - if p, ok := db.(prefetchingDB); ok { - db = p.ForPrefetchingOnly(db, maxConcurrency) - } - s.prefetcher = newTriePrefetcher(db, s.originalRoot, namespace) + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index dfc2c33a0f..d4675477fd 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -62,6 +62,9 @@ type triePrefetcher struct { } func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { + if pdb, ok := db.(withPrefetcherDB); ok { + db = pdb.PrefetcherDB() + } prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ db: db, diff --git a/miner/worker.go b/miner/worker.go index ec2bd60fca..3fbd3111dc 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -268,7 +268,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre if err != nil { return nil, err } - state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism) + state.StartPrefetcher("miner") return &environment{ signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), state: state,