Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

all: rework trie committer #25320

Merged
merged 13 commits into from
Aug 4, 2022
6 changes: 3 additions & 3 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error {

// writeBlockWithState writes block, metadata and corresponding state data to the
// database.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB) error {
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) error {
// Calculate the total difficulty of the block
ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1)
if ptd == nil {
Expand Down Expand Up @@ -1339,7 +1339,7 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types
// writeBlockAndSetHead is the internal implementation of WriteBlockAndSetHead.
// This function expects the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
if err := bc.writeBlockWithState(block, receipts, logs, state); err != nil {
if err := bc.writeBlockWithState(block, receipts, state); err != nil {
return NonStatTy, err
}
currentBlock := bc.CurrentBlock()
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool)
var status WriteStatus
if !setHead {
// Don't set the head, only insert the block
err = bc.writeBlockWithState(block, receipts, logs, statedb)
err = bc.writeBlockWithState(block, receipts, statedb)
} else {
status, err = bc.writeBlockAndSetHead(block, receipts, logs, statedb, false)
}
Expand Down
10 changes: 7 additions & 3 deletions core/state/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ type Trie interface {
// can be used even if the trie doesn't have one.
Hash() common.Hash

// Commit writes all nodes to the trie's memory database, tracking the internal
// and external (for account tries) references.
Commit(onleaf trie.LeafCallback) (common.Hash, int, error)
// Commit collects all dirty nodes in the trie and replace them with the
// corresponding node hash. All collected nodes(including dirty leaves if
// collectLeaf is true) will be encapsulated into a nodeset for return.
// The returned nodeset can be nil if the trie is clean(nothing to commit).
// Once the trie is committed, it's not usable anymore. A new trie must
// be created with new root and updated trie database for following usage
Commit(collectLeaf bool) (common.Hash, *trie.NodeSet, error)

// NodeIterator returns an iterator that returns nodes of the trie. Iteration
// starts at the key after the given start key.
Expand Down
12 changes: 6 additions & 6 deletions core/state/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package state
import "github.com/ethereum/go-ethereum/metrics"

var (
accountUpdatedMeter = metrics.NewRegisteredMeter("state/update/account", nil)
storageUpdatedMeter = metrics.NewRegisteredMeter("state/update/storage", nil)
accountDeletedMeter = metrics.NewRegisteredMeter("state/delete/account", nil)
storageDeletedMeter = metrics.NewRegisteredMeter("state/delete/storage", nil)
accountCommittedMeter = metrics.NewRegisteredMeter("state/commit/account", nil)
storageCommittedMeter = metrics.NewRegisteredMeter("state/commit/storage", nil)
accountUpdatedMeter = metrics.NewRegisteredMeter("state/update/account", nil)
storageUpdatedMeter = metrics.NewRegisteredMeter("state/update/storage", nil)
accountDeletedMeter = metrics.NewRegisteredMeter("state/delete/account", nil)
storageDeletedMeter = metrics.NewRegisteredMeter("state/delete/storage", nil)
accountTrieCommittedMeter = metrics.NewRegisteredMeter("state/trie/account", nil)
storageTriesCommittedMeter = metrics.NewRegisteredMeter("state/trie/storage", nil)
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
)
5 changes: 4 additions & 1 deletion core/state/snapshot/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, owner common.Hash, roo
for i, key := range result.keys {
snapTrie.Update(key, result.vals[i])
}
root, _, _ := snapTrie.Commit(nil)
root, nodes, _ := snapTrie.Commit(false)
if nodes != nil {
snapTrieDb.Update(trie.NewWithNodeSet(nodes))
}
snapTrieDb.Commit(root, false, nil)
}
// Construct the trie for state iteration, reuse the trie
Expand Down
31 changes: 21 additions & 10 deletions core/state/snapshot/generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type testHelper struct {
diskdb ethdb.Database
triedb *trie.Database
accTrie *trie.SecureTrie
nodes *trie.MergedNodeSet
}

func newHelper() *testHelper {
Expand All @@ -153,6 +154,7 @@ func newHelper() *testHelper {
diskdb: diskdb,
triedb: triedb,
accTrie: accTrie,
nodes: trie.NewMergedNodeSet(),
}
}

Expand Down Expand Up @@ -184,17 +186,22 @@ func (t *testHelper) makeStorageTrie(stateRoot, owner common.Hash, keys []string
for i, k := range keys {
stTrie.Update([]byte(k), []byte(vals[i]))
}
var root common.Hash
if !commit {
root = stTrie.Hash()
} else {
root, _, _ = stTrie.Commit(nil)
return stTrie.Hash().Bytes()
}
root, nodes, _ := stTrie.Commit(false)
if nodes != nil {
t.nodes.Merge(nodes)
}
return root.Bytes()
}

func (t *testHelper) Commit() common.Hash {
root, _, _ := t.accTrie.Commit(nil)
root, nodes, _ := t.accTrie.Commit(false)
if nodes != nil {
t.nodes.Merge(nodes)
}
t.triedb.Update(t.nodes)
t.triedb.Commit(root, false, nil)
return root
}
Expand Down Expand Up @@ -378,7 +385,7 @@ func TestGenerateCorruptAccountTrie(t *testing.T) {
helper.addTrieAccount("acc-2", &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}) // 0x19ead688e907b0fab07176120dceec244a72aff2f0aa51e8b827584e378772f4

root, _, _ := helper.accTrie.Commit(nil) // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978
root, _, _ := helper.accTrie.Commit(false) // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978

// Delete an account trie leaf and ensure the generator chokes
helper.triedb.Commit(root, false, nil)
Expand Down Expand Up @@ -413,7 +420,7 @@ func TestGenerateMissingStorageTrie(t *testing.T) {
helper.addTrieAccount("acc-2", &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
stRoot = helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-3")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: stRoot, CodeHash: emptyCode.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
root, _, _ := helper.accTrie.Commit(nil)
root, _, _ := helper.accTrie.Commit(false)

// We can only corrupt the disk database, so flush the tries out
helper.triedb.Reference(
Expand Down Expand Up @@ -458,7 +465,7 @@ func TestGenerateCorruptStorageTrie(t *testing.T) {
stRoot = helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-3")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: stRoot, CodeHash: emptyCode.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2

root, _, _ := helper.accTrie.Commit(nil)
root, _, _ := helper.accTrie.Commit(false)

// We can only corrupt the disk database, so flush the tries out
helper.triedb.Reference(
Expand Down Expand Up @@ -825,10 +832,12 @@ func populateDangling(disk ethdb.KeyValueStore) {
// This test will populate some dangling storages to see if they can be cleaned up.
func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) {
var helper = newHelper()
stRoot := helper.makeStorageTrie(common.Hash{}, common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)

stRoot := helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-1")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})
helper.addAccount("acc-2", &Account{Balance: big.NewInt(1), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()})

helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-3")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-3", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})

helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
Expand Down Expand Up @@ -858,10 +867,12 @@ func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) {
// This test will populate some dangling storages to see if they can be cleaned up.
func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) {
var helper = newHelper()
stRoot := helper.makeStorageTrie(common.Hash{}, common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)

stRoot := helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-1")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-1", &Account{Balance: big.NewInt(1), Root: stRoot, CodeHash: emptyCode.Bytes()})
helper.addTrieAccount("acc-2", &Account{Balance: big.NewInt(2), Root: emptyRoot.Bytes(), CodeHash: emptyCode.Bytes()})

helper.makeStorageTrie(common.Hash{}, hashData([]byte("acc-3")), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &Account{Balance: big.NewInt(3), Root: stRoot, CodeHash: emptyCode.Bytes()})

populateDangling(helper.diskdb)
Expand Down
11 changes: 6 additions & 5 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)

var emptyCodeHash = crypto.Keccak256(nil)
Expand Down Expand Up @@ -375,23 +376,23 @@ func (s *stateObject) updateRoot(db Database) {

// CommitTrie the storage trie of the object to db.
// This updates the trie root.
func (s *stateObject) CommitTrie(db Database) (int, error) {
func (s *stateObject) CommitTrie(db Database) (*trie.NodeSet, error) {
// If nothing changed, don't bother with hashing anything
if s.updateTrie(db) == nil {
return 0, nil
return nil, nil
}
if s.dbErr != nil {
return 0, s.dbErr
return nil, s.dbErr
}
// Track the amount of time wasted on committing the storage trie
if metrics.EnabledExpensive {
defer func(start time.Time) { s.db.StorageCommits += time.Since(start) }(time.Now())
}
root, committed, err := s.trie.Commit(nil)
root, nodes, err := s.trie.Commit(false)
if err == nil {
s.data.Root = root
}
return committed, err
return nodes, err
}

// AddBalance adds amount to s's balance.
Expand Down
47 changes: 28 additions & 19 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ func (s *StateDB) GetRefund() uint64 {
return s.refund
}

// Finalise finalises the state by removing the s destructed objects and clears
// Finalise finalises the state by removing the destructed objects and clears
// the journal as well as the refunds. Finalise, however, will not push any updates
// into the tries just yet. Only IntermediateRoot or Commit will do that.
func (s *StateDB) Finalise(deleteEmptyObjects bool) {
Expand Down Expand Up @@ -844,7 +844,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Although naively it makes sense to retrieve the account trie and then do
// the contract storage and account updates sequentially, that short circuits
// the account prefetcher. Instead, let's process all the storage updates
// first, giving the account prefeches just a few more milliseconds of time
// first, giving the account prefetches just a few more milliseconds of time
// to pull useful data from disk.
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
Expand Down Expand Up @@ -907,7 +907,11 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
s.IntermediateRoot(deleteEmptyObjects)

// Commit objects to the trie, measuring the elapsed time
var storageCommitted int
var (
accountTrieNodes int
storageTrieNodes int
nodes = trie.NewMergedNodeSet()
)
codeWriter := s.db.TrieDB().DiskDB().NewBatch()
for addr := range s.stateObjectsDirty {
if obj := s.stateObjects[addr]; !obj.deleted {
Expand All @@ -917,11 +921,17 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
obj.dirtyCode = false
}
// Write any storage changes in the state object to its storage trie
committed, err := obj.CommitTrie(s.db)
set, err := obj.CommitTrie(s.db)
if err != nil {
return common.Hash{}, err
}
storageCommitted += committed
// Merge the dirty nodes of storage trie into global set
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
}
storageTrieNodes += set.Len()
}
}
}
if len(s.stateObjectsDirty) > 0 {
Expand All @@ -937,30 +947,26 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
if metrics.EnabledExpensive {
start = time.Now()
}
// The onleaf func is called _serially_, so we can reuse the same account
// for unmarshalling every time.
var account types.StateAccount
root, accountCommitted, err := s.trie.Commit(func(_ [][]byte, _ []byte, leaf []byte, parent common.Hash, _ []byte) error {
if err := rlp.DecodeBytes(leaf, &account); err != nil {
return nil
}
if account.Root != emptyRoot {
s.db.TrieDB().Reference(account.Root, parent)
}
return nil
})
root, set, err := s.trie.Commit(true)
if err != nil {
return common.Hash{}, err
}
// Merge the dirty nodes of account trie into global set
if set != nil {
if err := nodes.Merge(set); err != nil {
return common.Hash{}, err
}
accountTrieNodes = set.Len()
}
if metrics.EnabledExpensive {
s.AccountCommits += time.Since(start)

accountUpdatedMeter.Mark(int64(s.AccountUpdated))
storageUpdatedMeter.Mark(int64(s.StorageUpdated))
accountDeletedMeter.Mark(int64(s.AccountDeleted))
storageDeletedMeter.Mark(int64(s.StorageDeleted))
accountCommittedMeter.Mark(int64(accountCommitted))
storageCommittedMeter.Mark(int64(storageCommitted))
accountTrieCommittedMeter.Mark(int64(accountTrieNodes))
storageTriesCommittedMeter.Mark(int64(storageTrieNodes))
s.AccountUpdated, s.AccountDeleted = 0, 0
s.StorageUpdated, s.StorageDeleted = 0, 0
}
Expand All @@ -984,6 +990,9 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
if err := s.db.TrieDB().Update(nodes); err != nil {
return common.Hash{}, err
}
s.originalRoot = root
return root, err
}
Expand Down
Loading