Skip to content

Commit

Permalink
core: use finalized block as the chain freeze indicator (#28683)
Browse files Browse the repository at this point in the history
* core: use finalized block as the chain freeze indicator

* core/rawdb: use max(finality, head-90k) as chain freezing threshold

* core/rawdb: fix tests

* core/rawdb: fix lint

* core/rawdb: address comments from peter

* core/rawdb: fix typo
  • Loading branch information
rjl493456442 authored Mar 4, 2024
1 parent a97d622 commit ca473b8
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 55 deletions.
10 changes: 7 additions & 3 deletions core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1757,7 +1757,7 @@ func testRepair(t *testing.T, tt *rewindTest, snapshots bool) {

func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme string) {
// It's hard to follow the test case, visualize the input
//log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true))))
// fmt.Println(tt.dump(true))

// Create a temporary persistent database
Expand Down Expand Up @@ -1830,10 +1830,14 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64) error
Freeze() error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
chain.SetFinalized(canonblocks[int(final)-1].Header())
}
db.(freezer).Freeze()

// Set the simulated pivot block
if tt.pivotBlock != nil {
Expand Down
8 changes: 6 additions & 2 deletions core/blockchain_sethead_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2044,10 +2044,14 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme

// Force run a freeze cycle
type freezer interface {
Freeze(threshold uint64) error
Freeze() error
Ancients() (uint64, error)
}
db.(freezer).Freeze(tt.freezeThreshold)
if tt.freezeThreshold < uint64(tt.canonicalBlocks) {
final := uint64(tt.canonicalBlocks) - tt.freezeThreshold
chain.SetFinalized(canonblocks[int(final)-1].Header())
}
db.(freezer).Freeze()

// Set the simulated pivot block
if tt.pivotBlock != nil {
Expand Down
113 changes: 70 additions & 43 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package rawdb

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -43,8 +43,6 @@ const (
// The background thread will keep moving ancient chain segments from key-value
// database to flat files for saving space on live database.
type chainFreezer struct {
threshold atomic.Uint64 // Number of recent blocks not to freeze (params.FullImmutabilityThreshold apart from tests)

*Freezer
quit chan struct{}
wg sync.WaitGroup
Expand All @@ -57,13 +55,11 @@ func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFre
if err != nil {
return nil, err
}
cf := chainFreezer{
return &chainFreezer{
Freezer: freezer,
quit: make(chan struct{}),
trigger: make(chan chan struct{}),
}
cf.threshold.Store(params.FullImmutabilityThreshold)
return &cf, nil
}, nil
}

// Close closes the chain freezer instance and terminates the background thread.
Expand All @@ -77,6 +73,57 @@ func (f *chainFreezer) Close() error {
return f.Freezer.Close()
}

// readHeadNumber returns the number of chain head block. 0 is returned if the
// block is unknown or not available yet.
func (f *chainFreezer) readHeadNumber(db ethdb.KeyValueReader) uint64 {
hash := ReadHeadBlockHash(db)
if hash == (common.Hash{}) {
log.Error("Head block is not reachable")
return 0
}
number := ReadHeaderNumber(db, hash)
if number == nil {
log.Error("Number of head block is missing")
return 0
}
return *number
}

// readFinalizedNumber returns the number of finalized block. 0 is returned
// if the block is unknown or not available yet.
func (f *chainFreezer) readFinalizedNumber(db ethdb.KeyValueReader) uint64 {
hash := ReadFinalizedBlockHash(db)
if hash == (common.Hash{}) {
return 0
}
number := ReadHeaderNumber(db, hash)
if number == nil {
log.Error("Number of finalized block is missing")
return 0
}
return *number
}

// freezeThreshold returns the threshold for chain freezing. It's determined
// by formula: max(finality, HEAD-params.FullImmutabilityThreshold).
func (f *chainFreezer) freezeThreshold(db ethdb.KeyValueReader) (uint64, error) {
var (
head = f.readHeadNumber(db)
final = f.readFinalizedNumber(db)
headLimit uint64
)
if head > params.FullImmutabilityThreshold {
headLimit = head - params.FullImmutabilityThreshold
}
if final == 0 && headLimit == 0 {
return 0, errors.New("freezing threshold is not available")
}
if final > headLimit {
return final, nil
}
return headLimit, nil
}

// freeze is a background thread that periodically checks the blockchain for any
// import progress and moves ancient data from the fast database into the freezer.
//
Expand Down Expand Up @@ -114,60 +161,39 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
return
}
}
// Retrieve the freezing threshold.
hash := ReadHeadBlockHash(nfdb)
if hash == (common.Hash{}) {
log.Debug("Current full block hash unavailable") // new chain, empty database
threshold, err := f.freezeThreshold(nfdb)
if err != nil {
backoff = true
log.Debug("Current full block not old enough to freeze", "err", err)
continue
}
number := ReadHeaderNumber(nfdb, hash)
threshold := f.threshold.Load()
frozen := f.frozen.Load()
switch {
case number == nil:
log.Error("Current full block number unavailable", "hash", hash)
backoff = true
continue

case *number < threshold:
log.Debug("Current full block not old enough to freeze", "number", *number, "hash", hash, "delay", threshold)
backoff = true
continue

case *number-threshold <= frozen:
log.Debug("Ancient blocks frozen already", "number", *number, "hash", hash, "frozen", frozen)
// Short circuit if the blocks below threshold are already frozen.
if frozen != 0 && frozen-1 >= threshold {
backoff = true
log.Debug("Ancient blocks frozen already", "threshold", threshold, "frozen", frozen)
continue
}
head := ReadHeader(nfdb, hash, *number)
if head == nil {
log.Error("Current full block unavailable", "number", *number, "hash", hash)
backoff = true
continue
}

// Seems we have data ready to be frozen, process in usable batches
var (
start = time.Now()
first, _ = f.Ancients()
limit = *number - threshold
start = time.Now()
first = frozen // the first block to freeze
last = threshold // the last block to freeze
)
if limit-first > freezerBatchLimit {
limit = first + freezerBatchLimit
if last-first+1 > freezerBatchLimit {
last = freezerBatchLimit + first - 1
}
ancients, err := f.freezeRange(nfdb, first, limit)
ancients, err := f.freezeRange(nfdb, first, last)
if err != nil {
log.Error("Error in block freeze operation", "err", err)
backoff = true
continue
}

// Batch of blocks have been frozen, flush them before wiping from leveldb
if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err)
}

// Wipe out all data from the active database
batch := db.NewBatch()
for i := 0; i < len(ancients); i++ {
Expand Down Expand Up @@ -250,8 +276,11 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
}
}

// freezeRange moves a batch of chain segments from the fast database to the freezer.
// The parameters (number, limit) specify the relevant block range, both of which
// are included.
func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hashes []common.Hash, err error) {
hashes = make([]common.Hash, 0, limit-number)
hashes = make([]common.Hash, 0, limit-number+1)

_, err = f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for ; number <= limit; number++ {
Expand Down Expand Up @@ -293,11 +322,9 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to Freezer: %v", err)
}

hashes = append(hashes, hash)
}
return nil
})

return hashes, err
}
8 changes: 1 addition & 7 deletions core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,10 @@ func (frdb *freezerdb) Close() error {
// Freeze is a helper method used for external testing to trigger and block until
// a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run.
func (frdb *freezerdb) Freeze(threshold uint64) error {
func (frdb *freezerdb) Freeze() error {
if frdb.AncientStore.(*chainFreezer).readonly {
return errReadOnly
}
// Set the freezer threshold to a temporary value
defer func(old uint64) {
frdb.AncientStore.(*chainFreezer).threshold.Store(old)
}(frdb.AncientStore.(*chainFreezer).threshold.Load())
frdb.AncientStore.(*chainFreezer).threshold.Store(threshold)

// Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1)
frdb.AncientStore.(*chainFreezer).trigger <- trigger
Expand Down

0 comments on commit ca473b8

Please sign in to comment.