Skip to content

Commit

Permalink
handle passing of concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
darioush committed Nov 18, 2024
1 parent 219cc16 commit 5640be0
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 22 deletions.
9 changes: 6 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {
blockStateInitTimer.Inc(time.Since(substart).Milliseconds())

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain")
activeState = statedb

// Process block using the parent state as reference point
Expand Down Expand Up @@ -1675,7 +1675,7 @@ func (bc *BlockChain) reprocessBlock(parent *types.Block, current *types.Block)
}

// Enable prefetching to pull in trie node paths while processing transactions
statedb.StartPrefetcher("chain", bc.cacheConfig.TriePrefetcherParallelism)
statedb.StartPrefetcher("chain")
defer func() {
statedb.StopPrefetcher()
}()
Expand Down Expand Up @@ -2073,7 +2073,10 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
bc.hc.SetCurrentHeader(block.Header())

lastAcceptedHash := block.Hash()
bc.stateCache = state.NewDatabaseWithNodeDB(bc.db, bc.triedb)
bc.stateCache = state.WithPrefetcher(
state.NewDatabaseWithNodeDB(bc.db, bc.triedb),
bc.cacheConfig.TriePrefetcherParallelism,
)

if err := bc.loadLastState(lastAcceptedHash); err != nil {
return err
Expand Down
26 changes: 17 additions & 9 deletions core/state/prefetcher_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,21 @@ import (
"github.com/ethereum/go-ethereum/log"
)

// ForPrefetchingOnly returns a new database that is only suitable for prefetching
// operations. It will not be safe to use for any other operations.
// Close must be called on the returned database when it is no longer needed
// to wait on all spawned goroutines.
func (*cachingDB) ForPrefetchingOnly(db Database, maxConcurrency int) Database {
return newPrefetcherDatabase(db, maxConcurrency)
type withPrefetcherDB interface {
PrefetcherDB() Database
}

type withPrefetcher struct {
Database
maxConcurrency int
}

func (db *withPrefetcher) PrefetcherDB() Database {
return newPrefetcherDatabase(db.Database, db.maxConcurrency)
}

func WithPrefetcher(db Database, maxConcurrency int) Database {
return &withPrefetcher{db, maxConcurrency}
}

type prefetcherDatabase struct {
Expand All @@ -29,9 +38,8 @@ type prefetcherDatabase struct {

func newPrefetcherDatabase(db Database, maxConcurrency int) *prefetcherDatabase {
return &prefetcherDatabase{
Database: db,
maxConcurrency: maxConcurrency,
workers: utils.NewBoundedWorkers(maxConcurrency),
Database: db,
workers: utils.NewBoundedWorkers(maxConcurrency),
}
}

Expand Down
11 changes: 2 additions & 9 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,13 @@ func NewWithSnapshot(root common.Hash, db Database, snap snapshot.Snapshot) (*St
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string, maxConcurrency int) {
func (s *StateDB) StartPrefetcher(namespace string) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
db := s.db
type prefetchingDB interface {
ForPrefetchingOnly(db Database, maxConcurrency int) Database
}
if p, ok := db.(prefetchingDB); ok {
db = p.ForPrefetchingOnly(db, maxConcurrency)
}
s.prefetcher = newTriePrefetcher(db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
}
}

Expand Down
3 changes: 3 additions & 0 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type triePrefetcher struct {
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
if pdb, ok := db.(withPrefetcherDB); ok {
db = pdb.PrefetcherDB()
}
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
Expand Down
2 changes: 1 addition & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (w *worker) createCurrentEnvironment(predicateContext *precompileconfig.Pre
if err != nil {
return nil, err
}
state.StartPrefetcher("miner", w.eth.BlockChain().CacheConfig().TriePrefetcherParallelism)
state.StartPrefetcher("miner")
return &environment{
signer: types.MakeSigner(w.chainConfig, header.Number, header.Time),
state: state,
Expand Down

0 comments on commit 5640be0

Please sign in to comment.