Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(trie): use direct Merkle value for database keys #2725

Merged
merged 2 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dot/state/offline_pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (p *OfflinePruner) SetBloomFilter() (err error) {
}

latestBlockNum := header.Number
keys := make(map[common.Hash]struct{})
merkleValues := make(map[string]struct{})

logger.Infof("Latest block number is %d", latestBlockNum)

Expand All @@ -121,7 +121,7 @@ func (p *OfflinePruner) SetBloomFilter() (err error) {
return err
}

tr.PopulateNodeHashes(tr.RootNode(), keys)
tr.PopulateMerkleValues(tr.RootNode(), merkleValues)

// get parent header of current block
header, err = p.blockState.GetHeader(header.ParentHash)
Expand All @@ -131,14 +131,14 @@ func (p *OfflinePruner) SetBloomFilter() (err error) {
blockNum = header.Number
}

for key := range keys {
err = p.bloom.put(key.ToBytes())
for key := range merkleValues {
err = p.bloom.put([]byte(key))
if err != nil {
return err
}
}

logger.Infof("Total keys added in bloom filter: %d", len(keys))
logger.Infof("Total keys added in bloom filter: %d", len(merkleValues))
return nil
}

Expand Down
73 changes: 37 additions & 36 deletions dot/state/pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,34 @@ type Config struct {

// Pruner is implemented by FullNode and ArchiveNode.
type Pruner interface {
StoreJournalRecord(deletedHashesSet, insertedHashesSet map[common.Hash]struct{},
StoreJournalRecord(deletedMerkleValues, insertedMerkleValues map[string]struct{},
blockHash common.Hash, blockNum int64) error
}

// ArchiveNode is a no-op since we don't prune nodes in archive mode.
type ArchiveNode struct{}

// StoreJournalRecord for archive node doesn't do anything.
func (*ArchiveNode) StoreJournalRecord(_, _ map[common.Hash]struct{},
func (*ArchiveNode) StoreJournalRecord(_, _ map[string]struct{},
_ common.Hash, _ int64) error {
return nil
}

type deathRecord struct {
blockHash common.Hash
deletedKeys map[common.Hash]int64 // Mapping from deleted key hash to block number.
blockHash common.Hash
deletedMerkleValueToBlockNumber map[string]int64
}

type deathRow []*deathRecord

// FullNode stores state trie diff and allows online state trie pruning
type FullNode struct {
logger log.LeveledLogger
deathList []deathRow
storageDB chaindb.Database
journalDB chaindb.Database
deathIndex map[common.Hash]int64 // Mapping from deleted key hash to block number.
logger log.LeveledLogger
deathList []deathRow
storageDB chaindb.Database
journalDB chaindb.Database
// deathIndex is the mapping from deleted node Merkle value to block number.
deathIndex map[string]int64
// pendingNumber is the block number to be pruned.
// Initial value is set to 1 and is incremented after every block pruning.
pendingNumber int64
Expand All @@ -88,31 +89,31 @@ type FullNode struct {
type journalRecord struct {
// blockHash of the block corresponding to journal record
blockHash common.Hash
// Hash of keys that are inserted into state trie of the block
insertedHashesSet map[common.Hash]struct{}
// Hash of keys that are deleted from state trie of the block
deletedHashesSet map[common.Hash]struct{}
// Merkle values of nodes inserted in the state trie of the block
insertedMerkleValues map[string]struct{}
// Merkle values of nodes deleted from the state trie of the block
deletedMerkleValues map[string]struct{}
}

type journalKey struct {
blockNum int64
blockHash common.Hash
}

func newJournalRecord(hash common.Hash, insertedHashesSet,
deletedHashesSet map[common.Hash]struct{}) *journalRecord {
func newJournalRecord(hash common.Hash, insertedMerkleValues,
deletedMerkleValues map[string]struct{}) *journalRecord {
return &journalRecord{
blockHash: hash,
insertedHashesSet: insertedHashesSet,
deletedHashesSet: deletedHashesSet,
blockHash: hash,
insertedMerkleValues: insertedMerkleValues,
deletedMerkleValues: deletedMerkleValues,
}
}

// NewFullNode creates a Pruner for full node.
func NewFullNode(db, storageDB chaindb.Database, retainBlocks int64, l log.LeveledLogger) (Pruner, error) {
p := &FullNode{
deathList: make([]deathRow, 0),
deathIndex: make(map[common.Hash]int64),
deathIndex: make(map[string]int64),
storageDB: storageDB,
journalDB: chaindb.NewTable(db, journalPrefix),
retainBlocks: retainBlocks,
Expand Down Expand Up @@ -140,9 +141,9 @@ func NewFullNode(db, storageDB chaindb.Database, retainBlocks int64, l log.Level
}

// StoreJournalRecord stores journal record into DB and add deathRow into deathList
func (p *FullNode) StoreJournalRecord(deletedHashesSet, insertedHashesSet map[common.Hash]struct{},
func (p *FullNode) StoreJournalRecord(deletedMerkleValues, insertedMerkleValues map[string]struct{},
blockHash common.Hash, blockNum int64) error {
jr := newJournalRecord(blockHash, insertedHashesSet, deletedHashesSet)
jr := newJournalRecord(blockHash, insertedMerkleValues, deletedMerkleValues)

key := &journalKey{blockNum, blockHash}
err := p.storeJournal(key, jr)
Expand All @@ -168,13 +169,13 @@ func (p *FullNode) addDeathRow(jr *journalRecord, blockNum int64) {
return
}

p.processInsertedKeys(jr.insertedHashesSet, jr.blockHash)
p.processInsertedKeys(jr.insertedMerkleValues, jr.blockHash)

// add deleted keys from journal to death index
deletedKeys := make(map[common.Hash]int64, len(jr.deletedHashesSet))
for k := range jr.deletedHashesSet {
// add deleted node Merkle values from journal to death index
deletedMerkleValueToBlockNumber := make(map[string]int64, len(jr.deletedMerkleValues))
for k := range jr.deletedMerkleValues {
p.deathIndex[k] = blockNum
deletedKeys[k] = blockNum
deletedMerkleValueToBlockNumber[k] = blockNum
}

blockIndex := blockNum - p.pendingNumber
Expand All @@ -183,25 +184,25 @@ func (p *FullNode) addDeathRow(jr *journalRecord, blockNum int64) {
}

record := &deathRecord{
blockHash: jr.blockHash,
deletedKeys: deletedKeys,
blockHash: jr.blockHash,
deletedMerkleValueToBlockNumber: deletedMerkleValueToBlockNumber,
}

// add deathRow to deathList
p.deathList[blockIndex] = append(p.deathList[blockIndex], record)
}

// Remove re-inserted keys
func (p *FullNode) processInsertedKeys(insertedHashesSet map[common.Hash]struct{}, blockHash common.Hash) {
for k := range insertedHashesSet {
func (p *FullNode) processInsertedKeys(insertedMerkleValues map[string]struct{}, blockHash common.Hash) {
for k := range insertedMerkleValues {
num, ok := p.deathIndex[k]
if !ok {
continue
}
records := p.deathList[num-p.pendingNumber]
for _, v := range records {
if v.blockHash == blockHash {
delete(v.deletedKeys, k)
delete(v.deletedMerkleValueToBlockNumber, k)
}
}
delete(p.deathIndex, k)
Expand Down Expand Up @@ -229,14 +230,14 @@ func (p *FullNode) start() {

sdbBatch := p.storageDB.NewBatch()
for _, record := range row {
err := p.deleteKeys(sdbBatch, record.deletedKeys)
err := p.deleteKeys(sdbBatch, record.deletedMerkleValueToBlockNumber)
if err != nil {
p.logger.Warnf("failed to prune keys for block number %d: %s", blockNum, err)
sdbBatch.Reset()
return
}

for k := range record.deletedKeys {
for k := range record.deletedMerkleValueToBlockNumber {
delete(p.deathIndex, k)
}
}
Expand Down Expand Up @@ -373,9 +374,9 @@ func (p *FullNode) getLastPrunedIndex() (int64, error) {
return blockNum, nil
}

func (*FullNode) deleteKeys(b chaindb.Batch, nodesHash map[common.Hash]int64) error {
for k := range nodesHash {
err := b.Del(k.ToBytes())
func (*FullNode) deleteKeys(b chaindb.Batch, deletedMerkleValueToBlockNumber map[string]int64) error {
for merkleValue := range deletedMerkleValueToBlockNumber {
err := b.Del([]byte(merkleValue))
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState, header *types.Header)
}

if header != nil {
insertedNodeHashes, err := ts.GetInsertedNodeHashes()
insertedMerkleValues, err := ts.GetInsertedMerkleValues()
if err != nil {
return fmt.Errorf("failed to get state trie inserted keys: block %s %w", header.Hash(), err)
}

deletedNodeHashes := ts.GetDeletedNodeHashes()
err = s.pruner.StoreJournalRecord(deletedNodeHashes, insertedNodeHashes, header.Hash(), int64(header.Number))
deletedMerkleValues := ts.GetDeletedMerkleValues()
err = s.pruner.StoreJournalRecord(deletedMerkleValues, insertedMerkleValues, header.Hash(), int64(header.Number))
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions lib/runtime/storage/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,18 +271,18 @@ func (s *TrieState) LoadCodeHash() (common.Hash, error) {
return common.Blake2bHash(code)
}

// GetInsertedNodeHashes returns a set of hashes of all nodes
// that were inserted into state trie since the last block produced.
func (s *TrieState) GetInsertedNodeHashes() (hashesSet map[common.Hash]struct{}, err error) {
// GetInsertedMerkleValues returns the set of all node Merkle value inserted
// into the state trie since the last block produced.
func (s *TrieState) GetInsertedMerkleValues() (merkleValues map[string]struct{}, err error) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.t.GetInsertedNodeHashes()
return s.t.GetInsertedMerkleValues()
}

// GetDeletedNodeHashes returns the hash of nodes that were deleted
// GetDeletedMerkleValues returns the set of all node Merkle values deleted
// from the state trie since the last block produced.
func (s *TrieState) GetDeletedNodeHashes() (hashesSet map[common.Hash]struct{}) {
func (s *TrieState) GetDeletedMerkleValues() (merkleValues map[string]struct{}) {
s.lock.RLock()
defer s.lock.RUnlock()
return s.t.GetDeletedNodeHashes()
return s.t.GetDeletedMerkleValues()
}
47 changes: 22 additions & 25 deletions lib/trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ func (t *Trie) loadNode(db Database, n *Node) error {
return nil
}

// PopulateNodeHashes writes hashes of each children of the node given
// as keys to the map hashesSet.
func (t *Trie) PopulateNodeHashes(n *Node, hashesSet map[common.Hash]struct{}) {
// PopulateMerkleValues writes the Merkle value of each children of the node given
// as keys to the map merkleValues.
func (t *Trie) PopulateMerkleValues(n *Node, merkleValues map[string]struct{}) {
if n.Kind() != node.Branch {
return
}
Expand All @@ -198,10 +198,9 @@ func (t *Trie) PopulateNodeHashes(n *Node, hashesSet map[common.Hash]struct{}) {
continue
}

hash := common.BytesToHash(child.MerkleValue)
hashesSet[hash] = struct{}{}
merkleValues[string(child.MerkleValue)] = struct{}{}

t.PopulateNodeHashes(child, hashesSet)
t.PopulateMerkleValues(child, merkleValues)
}
}

Expand Down Expand Up @@ -354,20 +353,18 @@ func (t *Trie) writeDirtyNode(db chaindb.Batch, n *Node) (err error) {
return nil
}

// GetInsertedNodeHashes returns a set of hashes with all
// the hashes of all nodes that were inserted in the state trie
// since the last snapshot.
// We need to compute the hash values of each newly inserted node.
func (t *Trie) GetInsertedNodeHashes() (hashesSet map[common.Hash]struct{}, err error) {
hashesSet = make(map[common.Hash]struct{})
err = t.getInsertedNodeHashesAtNode(t.root, hashesSet)
// GetInsertedMerkleValues returns the set of node Merkle values
// for each node that was inserted in the state trie since the last snapshot.
func (t *Trie) GetInsertedMerkleValues() (merkleValues map[string]struct{}, err error) {
merkleValues = make(map[string]struct{})
err = t.getInsertedNodeHashesAtNode(t.root, merkleValues)
if err != nil {
return nil, err
}
return hashesSet, nil
return merkleValues, nil
}

func (t *Trie) getInsertedNodeHashesAtNode(n *Node, hashes map[common.Hash]struct{}) (err error) {
func (t *Trie) getInsertedNodeHashesAtNode(n *Node, merkleValues map[string]struct{}) (err error) {
if n == nil || !n.Dirty {
return nil
}
Expand All @@ -384,7 +381,7 @@ func (t *Trie) getInsertedNodeHashesAtNode(n *Node, hashes map[common.Hash]struc
n.MerkleValue, err)
}

hashes[common.BytesToHash(merkleValue)] = struct{}{}
merkleValues[string(merkleValue)] = struct{}{}

if n.Kind() != node.Branch {
return nil
Expand All @@ -395,7 +392,7 @@ func (t *Trie) getInsertedNodeHashesAtNode(n *Node, hashes map[common.Hash]struc
continue
}

err := t.getInsertedNodeHashesAtNode(child, hashes)
err := t.getInsertedNodeHashesAtNode(child, merkleValues)
if err != nil {
// Note: do not wrap error since this is called recursively.
return err
Expand All @@ -405,13 +402,13 @@ func (t *Trie) getInsertedNodeHashesAtNode(n *Node, hashes map[common.Hash]struc
return nil
}

// GetDeletedNodeHashes returns a set of all the hashes of nodes that were
// deleted from the trie since the last snapshot was made.
// The returned set is a copy of the internal set to prevent data races.
func (t *Trie) GetDeletedNodeHashes() (hashesSet map[common.Hash]struct{}) {
hashesSet = make(map[common.Hash]struct{}, len(t.deletedKeys))
for k := range t.deletedKeys {
hashesSet[k] = struct{}{}
// GetDeletedMerkleValues returns a set of all the node Merkle values for each
// node that was deleted from the trie since the last snapshot was made.
// The returned set is a copy of the internal set to prevent data corruption.
func (t *Trie) GetDeletedMerkleValues() (merkleValues map[string]struct{}) {
merkleValues = make(map[string]struct{}, len(t.deletedMerkleValues))
for k := range t.deletedMerkleValues {
merkleValues[k] = struct{}{}
}
return hashesSet
return merkleValues
}
Loading