Skip to content

Commit

Permalink
core, eth: merge snap-sync chain download progress logs (#26676)
Browse files Browse the repository at this point in the history
  • Loading branch information
karalabe authored Feb 21, 2023
1 parent 7d4db69 commit 90d2551
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 42 deletions.
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Info("Imported new block receipts", context...)
log.Debug("Imported new block receipts", context...)

return 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time,
if res.ignored > 0 {
context = append(context, []interface{}{"ignored", res.ignored}...)
}
log.Info("Imported new block headers", context...)
log.Debug("Imported new block headers", context...)
return res.status, err
}

Expand Down
26 changes: 13 additions & 13 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerHashTable, number)
data, _ = reader.Ancient(ChainFreezerHashTable, number)
if len(data) == 0 {
// Get it by hash from leveldb
data, _ = db.Get(headerHashKey(number))
Expand Down Expand Up @@ -334,7 +334,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
}
// read remaining from ancients
max := count * 700
data, err := db.AncientRange(chainFreezerHeaderTable, i+1-count, count, max)
data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, max)
if err == nil && uint64(len(data)) == count {
// the data is on the order [h, h+1, .., n] -- reordering needed
for i := range data {
Expand All @@ -351,7 +351,7 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ = reader.Ancient(chainFreezerHeaderTable, number)
data, _ = reader.Ancient(ChainFreezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return nil
}
Expand Down Expand Up @@ -427,7 +427,7 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
// isCanon is an internal utility method, to check whether the given number/hash
// is part of the ancient (canon) set.
func isCanon(reader ethdb.AncientReaderOp, number uint64, hash common.Hash) bool {
h, err := reader.Ancient(chainFreezerHashTable, number)
h, err := reader.Ancient(ChainFreezerHashTable, number)
if err != nil {
return false
}
Expand All @@ -443,7 +443,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
return nil
}
// If not, try reading from leveldb
Expand All @@ -458,7 +458,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
if len(data) > 0 {
return nil
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerDifficultyTable, number)
data, _ = reader.Ancient(ChainFreezerDifficultyTable, number)
return nil
}
// If not, try reading from leveldb
Expand Down Expand Up @@ -586,7 +586,7 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerReceiptTable, number)
data, _ = reader.Ancient(ChainFreezerReceiptTable, number)
return nil
}
// If not, try reading from leveldb
Expand Down Expand Up @@ -787,19 +787,19 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts

func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
num := block.NumberU64()
if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
}
if err := op.Append(chainFreezerHeaderTable, num, header); err != nil {
if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil {
return fmt.Errorf("can't append block header %d: %v", num, err)
}
if err := op.Append(chainFreezerBodiesTable, num, block.Body()); err != nil {
if err := op.Append(ChainFreezerBodiesTable, num, block.Body()); err != nil {
return fmt.Errorf("can't append block body %d: %v", num, err)
}
if err := op.Append(chainFreezerReceiptTable, num, receipts); err != nil {
if err := op.Append(ChainFreezerReceiptTable, num, receipts); err != nil {
return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
if err := op.Append(chainFreezerDifficultyTable, num, td); err != nil {
if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil {
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
return nil
Expand Down
30 changes: 15 additions & 15 deletions core/rawdb/ancient_scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,30 @@ package rawdb

// The list of table names of chain freezer.
const (
// chainFreezerHeaderTable indicates the name of the freezer header table.
chainFreezerHeaderTable = "headers"
// ChainFreezerHeaderTable indicates the name of the freezer header table.
ChainFreezerHeaderTable = "headers"

// chainFreezerHashTable indicates the name of the freezer canonical hash table.
chainFreezerHashTable = "hashes"
// ChainFreezerHashTable indicates the name of the freezer canonical hash table.
ChainFreezerHashTable = "hashes"

// chainFreezerBodiesTable indicates the name of the freezer block body table.
chainFreezerBodiesTable = "bodies"
// ChainFreezerBodiesTable indicates the name of the freezer block body table.
ChainFreezerBodiesTable = "bodies"

// chainFreezerReceiptTable indicates the name of the freezer receipts table.
chainFreezerReceiptTable = "receipts"
// ChainFreezerReceiptTable indicates the name of the freezer receipts table.
ChainFreezerReceiptTable = "receipts"

// chainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
chainFreezerDifficultyTable = "diffs"
// ChainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
ChainFreezerDifficultyTable = "diffs"
)

// chainFreezerNoSnappy configures whether compression is disabled for the ancient-tables.
// Hashes and difficulties don't compress well.
var chainFreezerNoSnappy = map[string]bool{
chainFreezerHeaderTable: false,
chainFreezerHashTable: true,
chainFreezerBodiesTable: false,
chainFreezerReceiptTable: false,
chainFreezerDifficultyTable: true,
ChainFreezerHeaderTable: false,
ChainFreezerHashTable: true,
ChainFreezerBodiesTable: false,
ChainFreezerReceiptTable: false,
ChainFreezerDifficultyTable: true,
}

// The list of identifiers of ancient stores.
Expand Down
10 changes: 5 additions & 5 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,19 +280,19 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
}

// Write to the batch.
if err := op.AppendRaw(chainFreezerHashTable, number, hash[:]); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, number, hash[:]); err != nil {
return fmt.Errorf("can't write hash to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerHeaderTable, number, header); err != nil {
if err := op.AppendRaw(ChainFreezerHeaderTable, number, header); err != nil {
return fmt.Errorf("can't write header to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerBodiesTable, number, body); err != nil {
if err := op.AppendRaw(ChainFreezerBodiesTable, number, body); err != nil {
return fmt.Errorf("can't write body to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerReceiptTable, number, receipts); err != nil {
if err := op.AppendRaw(ChainFreezerReceiptTable, number, receipts); err != nil {
return fmt.Errorf("can't write receipts to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerDifficultyTable, number, td); err != nil {
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to Freezer: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
if i+count > frozen {
count = frozen - i
}
data, err := db.AncientRange(chainFreezerHashTable, i, count, 32*count)
data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count)
if err != nil {
log.Crit("Failed to init database from freezer", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
// the freezer and the key-value store.
frgenesis, err := frdb.Ancient(chainFreezerHashTable, 0)
frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0)
if err != nil {
return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err)
} else if !bytes.Equal(kvgenesis, frgenesis) {
Expand Down
65 changes: 64 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ type Downloader struct {
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)

// Progress reporting metrics
syncStartBlock uint64 // Head snap block when Geth was started
syncStartTime time.Time // Time instance when chain sync started
syncLogTime time.Time // Time instance when status was last reported
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -231,7 +236,9 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentFastBlock().NumberU64(),
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))

go dl.stateFetcher()
Expand Down Expand Up @@ -1614,6 +1621,7 @@ func (d *Downloader) processSnapSyncContent() error {
if len(results) == 0 {
// If pivot sync is done, stop
if oldPivot == nil {
d.reportSnapSyncProgress(true)
return sync.Cancel()
}
// If sync failed, stop
Expand All @@ -1627,6 +1635,8 @@ func (d *Downloader) processSnapSyncContent() error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
d.reportSnapSyncProgress(false)

// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
Expand Down Expand Up @@ -1739,7 +1749,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
}
default:
}
// Retrieve the a batch of results to import
// Retrieve the batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting snap-sync blocks", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
Expand Down Expand Up @@ -1820,3 +1830,56 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea
}
return headers
}

// reportSnapSyncProgress calculates various status reports and provides it to the user.
func (d *Downloader) reportSnapSyncProgress(force bool) {
// Initialize the sync start time if it's the first time we're reporting
if d.syncStartTime.IsZero() {
d.syncStartTime = time.Now().Add(-time.Millisecond) // -1ms offset to avoid division by zero
}
// Don't report all the events, just occasionally
if !force && time.Since(d.syncLogTime) < 8*time.Second {
return
}
// Don't report anything until we have a meaningful progress
var (
headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable)
bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable)
receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable)
)
syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes)
if syncedBytes == 0 {
return
}
var (
header = d.blockchain.CurrentHeader()
block = d.blockchain.CurrentFastBlock()
)
syncedBlocks := block.NumberU64() - d.syncStartBlock
if syncedBlocks == 0 {
return
}
// Retrieve the current chain head and calculate the ETA
latest, _, err := d.skeleton.Bounds()
if err != nil {
// We're going to cheat for non-merged networks, but that's fine
latest = d.pivotHeader
}
if latest == nil {
// This should really never happen, but add some defensive code for now.
// TODO(karalabe): Remove it eventually if we don't see it blow.
log.Error("Nil latest block in sync progress report")
return
}
var (
left = latest.Number.Uint64() - block.NumberU64()
eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left)

progress = fmt.Sprintf("%.2f%%", float64(block.NumberU64())*100/float64(latest.Number.Uint64()))
headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString())
bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(bodyBytes).TerminalString())
receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(receiptBytes).TerminalString())
)
log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta))
d.syncLogTime = time.Now()
}
9 changes: 5 additions & 4 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type queue struct {
active *sync.Cond
closed bool

lastStatLog time.Time
logTime time.Time // Time instance when status was last reported
}

// newQueue creates a new download queue for scheduling block retrieval.
Expand Down Expand Up @@ -390,11 +390,12 @@ func (q *queue) Results(block bool) []*fetchResult {
}
}
// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
if time.Since(q.logTime) >= 60*time.Second {
q.logTime = time.Now()

info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
log.Debug("Downloader queue stats", info...)
}
return results
}
Expand Down

0 comments on commit 90d2551

Please sign in to comment.