From cfaeac8edacac43a410a0ce417cf557a180cb0a0 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 30 Mar 2022 02:37:35 +0800 Subject: [PATCH] core/state/snapshot: clean up the generation code (#24479) --- core/state/snapshot/generate.go | 235 +++++++++++++++++++------------- 1 file changed, 137 insertions(+), 98 deletions(-) diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 9d74ca4d9b31..39d30a20c008 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -379,7 +379,7 @@ func (dl *diskLayer) proveRange(stats *generatorStats, root common.Hash, prefix type onStateCallback func(key []byte, val []byte, write bool, delete bool) error // generateRange generates the state segment with particular prefix. Generation can -// either verify the correctness of existing state through rangeproof and skip +// either verify the correctness of existing state through range-proof and skip // generation, or iterate trie to regenerate state on demand. func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, origin []byte, max int, stats *generatorStats, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) { // Use range prover to check the validity of the flat state in the range @@ -532,66 +532,94 @@ func (dl *diskLayer) generateRange(root common.Hash, prefix []byte, kind string, return !trieMore && !result.diskMore, last, nil } -// generate is a background thread that iterates over the state and storage tries, -// constructing the state snapshot. All the arguments are purely for statistics -// gathering and logging, since the method surfs the blocks as they arrive, often -// being restarted. -func (dl *diskLayer) generate(stats *generatorStats) { - var ( - accMarker []byte - accountRange = accountCheckRange - ) - if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that - // Always reset the initial account range as 1 - // whenever recover from the interruption. - accMarker, accountRange = dl.genMarker[:common.HashLength], 1 +// checkAndFlush checks if an interruption signal is received or the +// batch size has exceeded the allowance. +func (dl *diskLayer) checkAndFlush(current []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { + var abort chan *generatorStats + select { + case abort = <-dl.genAbort: + default: } - var ( - batch = dl.diskdb.NewBatch() - logged = time.Now() - accOrigin = common.CopyBytes(accMarker) - abort chan *generatorStats - ) - stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) + if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + if bytes.Compare(current, dl.genMarker) < 0 { + log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker)) + } + // Flush out the batch anyway no matter it's empty or not. + // It's possible that all the states are recovered and the + // generation indeed makes progress. + journalProgress(batch, current, stats) - checkAndFlush := func(currentLocation []byte) error { - select { - case abort = <-dl.genAbort: - default: + if err := batch.Write(); err != nil { + return err } - if batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { - if bytes.Compare(currentLocation, dl.genMarker) < 0 { - log.Error("Snapshot generator went backwards", - "currentLocation", fmt.Sprintf("%x", currentLocation), - "genMarker", fmt.Sprintf("%x", dl.genMarker)) - } + batch.Reset() - // Flush out the batch anyway no matter it's empty or not. - // It's possible that all the states are recovered and the - // generation indeed makes progress. - journalProgress(batch, currentLocation, stats) + dl.lock.Lock() + dl.genMarker = current + dl.lock.Unlock() - if err := batch.Write(); err != nil { - return err - } - batch.Reset() + if abort != nil { + stats.Log("Aborting state snapshot generation", dl.root, current) + return newAbortErr(abort) // bubble up an error for interruption + } + } + if time.Since(*logged) > 8*time.Second { + stats.Log("Generating state snapshot", dl.root, current) + *logged = time.Now() + } + return nil +} - dl.lock.Lock() - dl.genMarker = currentLocation - dl.lock.Unlock() +// generateStorages generates the missing storage slots of the specific contract. +// It's supposed to restart the generation from the given origin position. +func generateStorages(dl *diskLayer, account common.Hash, storageRoot common.Hash, storeMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { + onStorage := func(key []byte, val []byte, write bool, delete bool) error { + defer func(start time.Time) { + snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds()) + }(time.Now()) - if abort != nil { - stats.Log("Aborting state snapshot generation", dl.root, currentLocation) - return errors.New("aborted") - } + if delete { + rawdb.DeleteStorageSnapshot(batch, account, common.BytesToHash(key)) + snapWipedStorageMeter.Mark(1) + return nil + } + if write { + rawdb.WriteStorageSnapshot(batch, account, common.BytesToHash(key), val) + snapGeneratedStorageMeter.Mark(1) + } else { + snapRecoveredStorageMeter.Mark(1) } - if time.Since(logged) > 8*time.Second { - stats.Log("Generating state snapshot", dl.root, currentLocation) - logged = time.Now() + stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) + stats.slots++ + + // If we've exceeded our batch allowance or termination was requested, flush to disk + if err := dl.checkAndFlush(append(account[:], key...), batch, stats, logged); err != nil { + return err } return nil } + // Loop for re-generating the missing storage slots. + var origin = common.CopyBytes(storeMarker) + for { + exhausted, last, err := dl.generateRange(storageRoot, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), "storage", origin, storageCheckRange, stats, onStorage, nil) + if err != nil { + return err // The procedure it aborted, either by external signal or internal error. + } + // Abort the procedure if the entire contract storage is generated + if exhausted { + break + } + if origin = increaseKey(last); origin == nil { + break // special case, the last is 0xffffffff...fff + } + } + return nil +} +// generateAccounts generates the missing snapshot accounts as well as their +// storage slots in the main trie. It's supposed to restart the generation +// from the given origin position. +func generateAccounts(dl *diskLayer, accMarker []byte, batch ethdb.Batch, stats *generatorStats, logged *time.Time) error { onAccount := func(key []byte, val []byte, write bool, delete bool) error { var ( start = time.Now() @@ -647,7 +675,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { marker = dl.genMarker[:] } // If we've exceeded our batch allowance or termination was requested, flush to disk - if err := checkAndFlush(marker); err != nil { + if err := dl.checkAndFlush(marker, batch, stats, logged); err != nil { return err } // If the iterated account is the contract, create a further loop to @@ -671,70 +699,67 @@ func (dl *diskLayer) generate(stats *generatorStats) { if accMarker != nil && bytes.Equal(accountHash[:], accMarker) && len(dl.genMarker) > common.HashLength { storeMarker = dl.genMarker[common.HashLength:] } - onStorage := func(key []byte, val []byte, write bool, delete bool) error { - defer func(start time.Time) { - snapStorageWriteCounter.Inc(time.Since(start).Nanoseconds()) - }(time.Now()) - - if delete { - rawdb.DeleteStorageSnapshot(batch, accountHash, common.BytesToHash(key)) - snapWipedStorageMeter.Mark(1) - return nil - } - if write { - rawdb.WriteStorageSnapshot(batch, accountHash, common.BytesToHash(key), val) - snapGeneratedStorageMeter.Mark(1) - } else { - snapRecoveredStorageMeter.Mark(1) - } - stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val)) - stats.slots++ - - // If we've exceeded our batch allowance or termination was requested, flush to disk - if err := checkAndFlush(append(accountHash[:], key...)); err != nil { - return err - } - return nil - } - var storeOrigin = common.CopyBytes(storeMarker) - for { - exhausted, last, err := dl.generateRange(acc.Root, append(rawdb.SnapshotStoragePrefix, accountHash.Bytes()...), "storage", storeOrigin, storageCheckRange, stats, onStorage, nil) - if err != nil { - return err - } - if exhausted { - break - } - if storeOrigin = increaseKey(last); storeOrigin == nil { - break // special case, the last is 0xffffffff...fff - } + if err := generateStorages(dl, accountHash, acc.Root, storeMarker, batch, stats, logged); err != nil { + return err } } // Some account processed, unmark the marker accMarker = nil return nil } - - // Global loop for regerating the entire state trie + all layered storage tries. + // Always reset the initial account range as 1 whenever recover from the interruption. + var accountRange = accountCheckRange + if len(accMarker) > 0 { + accountRange = 1 + } + // Global loop for re-generating the account snapshots + all layered storage snapshots. + origin := common.CopyBytes(accMarker) for { - exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", accOrigin, accountRange, stats, onAccount, FullAccountRLP) - // The procedure it aborted, either by external signal or internal error + exhausted, last, err := dl.generateRange(dl.root, rawdb.SnapshotAccountPrefix, "account", origin, accountRange, stats, onAccount, FullAccountRLP) if err != nil { - if abort == nil { // aborted by internal error, wait the signal - abort = <-dl.genAbort - } - abort <- stats - return + return err // The procedure it aborted, either by external signal or internal error. } // Abort the procedure if the entire snapshot is generated if exhausted { break } - if accOrigin = increaseKey(last); accOrigin == nil { + if origin = increaseKey(last); origin == nil { break // special case, the last is 0xffffffff...fff } accountRange = accountCheckRange } + return nil +} + +// generate is a background thread that iterates over the state and storage tries, +// constructing the state snapshot. All the arguments are purely for statistics +// gathering and logging, since the method surfs the blocks as they arrive, often +// being restarted. +func (dl *diskLayer) generate(stats *generatorStats) { + var accMarker []byte + if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that + accMarker = dl.genMarker[:common.HashLength] + } + var ( + batch = dl.diskdb.NewBatch() + logged = time.Now() + abort chan *generatorStats + ) + stats.Log("Resuming state snapshot generation", dl.root, dl.genMarker) + + // Generate the snapshot accounts from the point where they left off. + if err := generateAccounts(dl, accMarker, batch, stats, &logged); err != nil { + // Extract the received interruption signal if exists + if aerr, ok := err.(*abortErr); ok { + abort = aerr.abort + } + // Aborted by internal error, wait the signal + if abort == nil { + abort = <-dl.genAbort + } + abort <- stats + return + } // Snapshot fully generated, set the marker to nil. // Note even there is nothing to commit, persist the // generator anyway to mark the snapshot is complete. @@ -762,7 +787,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { } // increaseKey increase the input key by one bit. Return nil if the entire -// addition operation overflows, +// addition operation overflows. func increaseKey(key []byte) []byte { for i := len(key) - 1; i >= 0; i-- { key[i]++ @@ -772,3 +797,17 @@ func increaseKey(key []byte) []byte { } return nil } + +// abortErr wraps an interruption signal received to represent the +// generation is aborted by external processes. +type abortErr struct { + abort chan *generatorStats +} + +func newAbortErr(abort chan *generatorStats) error { + return &abortErr{abort: abort} +} + +func (err *abortErr) Error() string { + return "aborted" +}