Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth, miner: add timeout for building sealing block #25407

Merged
merged 5 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var (
utils.MinerExtraDataFlag,
utils.MinerRecommitIntervalFlag,
utils.MinerNoVerifyFlag,
utils.MinerNewPayloadTimeout,
utils.NATFlag,
utils.NoDiscoverFlag,
utils.DiscoveryV5Flag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ var (
Usage: "Disable remote sealing verification",
Category: flags.MinerCategory,
}
MinerNewPayloadTimeout = &cli.DurationFlag{
Name: "miner.newpayload-timeout",
Usage: "Specify the maximum time allowance for creating a new payload",
Value: ethconfig.Defaults.Miner.NewPayloadTimeout,
Category: flags.MinerCategory,
}

// Account settings
UnlockedAccountFlag = &cli.StringFlag{
Expand Down Expand Up @@ -1647,6 +1653,9 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) {
if ctx.IsSet(MinerNoVerifyFlag.Name) {
cfg.Noverify = ctx.Bool(MinerNoVerifyFlag.Name)
}
if ctx.IsSet(MinerNewPayloadTimeout.Name) {
cfg.NewPayloadTimeout = ctx.Duration(MinerNewPayloadTimeout.Name)
}
}

func setRequiredBlocks(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down
7 changes: 2 additions & 5 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,9 @@ func TestExchangeTransitionConfig(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
)

// invalid ttd
api := NewConsensusAPI(ethservice)
config := beacon.TransitionConfigurationV1{
TerminalTotalDifficulty: (*hexutil.Big)(big.NewInt(0)),
TerminalBlockHash: common.Hash{},
Expand Down Expand Up @@ -812,10 +811,8 @@ func TestInvalidBloom(t *testing.T) {

func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(100)
fmt.Println(genesis.Config.TerminalTotalDifficulty)
genesis.Config.TerminalTotalDifficulty = preMergeBlocks[0].Difficulty() //.Sub(genesis.Config.TerminalTotalDifficulty, preMergeBlocks[len(preMergeBlocks)-1].Difficulty())

fmt.Println(genesis.Config.TerminalTotalDifficulty)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()

Expand Down
16 changes: 6 additions & 10 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,12 @@ var Defaults = Config{
TrieTimeout: 60 * time.Minute,
SnapshotCache: 102,
FilterLogCacheSize: 32,
Miner: miner.Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Recommit: 3 * time.Second,
},
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
Miner: miner.DefaultConfig,
TxPool: core.DefaultTxPoolConfig,
RPCGasCap: 50000000,
RPCEVMTimeout: 5 * time.Second,
GPO: FullNodeGPO,
RPCTxFeeCap: 1, // 1 ether
}

func init() {
Expand Down
10 changes: 10 additions & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ type Config struct {
GasPrice *big.Int // Minimum gas price for mining a transaction
Recommit time.Duration // The time interval for miner to re-create mining work.
Noverify bool // Disable remote mining solution verification(only useful in ethash).

NewPayloadTimeout time.Duration // The maximum time allowance for creating a new payload
}

// DefaultConfig contains default settings for miner.
var DefaultConfig = Config{
GasCeil: 30000000,
GasPrice: big.NewInt(params.GWei),
Recommit: 3 * time.Second,
NewPayloadTimeout: 2 * time.Second,
}

// Miner creates blocks and searches for proof-of-work values.
Expand Down
116 changes: 80 additions & 36 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const (
var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
errBlockInterruptedByTimeout = errors.New("timeout while building block")
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -158,6 +159,7 @@ const (
commitInterruptNone int32 = iota
commitInterruptNewHead
commitInterruptResubmit
commitInterruptTimeout
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
Expand Down Expand Up @@ -241,6 +243,13 @@ type worker struct {
// non-stop and no real transaction will be included.
noempty uint32

// newpayloadTimeout is the maximum timeout allowance for creating payload.
// The default value is 2 seconds but node operator can set it to arbitrary
// large value. A large timeout allowance may cause Geth to fail creating
// a non-empty payload within the specified time and eventually miss the slot
// in case there are some computation expensive transactions in txpool.
newpayloadTimeout time.Duration

// External functions
isLocalBlock func(header *types.Header) bool // Function used to determine whether the specified block is mined by local miner.

Expand Down Expand Up @@ -288,6 +297,16 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}
// Sanitize the timeout config for creating payload.
newpayloadTimeout := worker.config.NewPayloadTimeout
if newpayloadTimeout == 0 {
log.Warn("Sanitizing new payload timeout to default", "provided", newpayloadTimeout, "updated", DefaultConfig.NewPayloadTimeout)
newpayloadTimeout = DefaultConfig.NewPayloadTimeout
}
if newpayloadTimeout < time.Millisecond*100 {
log.Warn("Low payload timeout may cause high amount of non-full blocks", "provided", newpayloadTimeout, "default", DefaultConfig.NewPayloadTimeout)
}
worker.newpayloadTimeout = newpayloadTimeout

worker.wg.Add(4)
go worker.mainLoop()
Expand Down Expand Up @@ -844,42 +863,26 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
var coalescedLogs []*types.Log

for {
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
// (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2.
// For the first two cases, the semi-finished work will be discarded.
// For the third case, the semi-finished work will be submitted to the consensus engine.
if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}
return errBlockInterruptedByRecommit
// Check interruption signal and abort building if it's fired.
if interrupt != nil {
if signal := atomic.LoadInt32(interrupt); signal != commitInterruptNone {
return signalToErr(signal)
}
return errBlockInterruptedByNewHead
}
// If we don't have enough gas for any further transactions then we're done
// If we don't have enough gas for any further transactions then we're done.
if env.gasPool.Gas() < params.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
break
}
// Retrieve the next transaction and abort if all done
// Retrieve the next transaction and abort if all done.
tx := txs.Peek()
if tx == nil {
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)

// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) {
Expand Down Expand Up @@ -926,7 +929,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
txs.Shift()
}
}

if !w.isRunning() && len(coalescedLogs) > 0 {
// We don't push the pendingLogsEvent while we are sealing. The reason is that
// when we are sealing, the worker will regenerate a sealing block every 3 seconds.
Expand All @@ -942,11 +944,6 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
}
w.pendingLogsFeed.Send(cpy)
}
// Notify resubmit loop to decrease resubmitting interval if current interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return nil
}

Expand Down Expand Up @@ -986,15 +983,15 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
}
timestamp = parent.Time() + 1
}
// Construct the sealing block header, set the extra field if it's allowed
num := parent.Number()
// Construct the sealing block header.
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
Number: new(big.Int).Add(parent.Number(), common.Big1),
GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
Time: timestamp,
Coinbase: genParams.coinbase,
}
// Set the extra field if it's allowed.
if !genParams.noExtra && len(w.extra) != 0 {
header.Extra = w.extra
}
Expand Down Expand Up @@ -1082,7 +1079,16 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
defer work.discard()

if !params.noTxs {
w.fillTransactions(nil, work)
interrupt := new(int32)
timer := time.AfterFunc(w.newpayloadTimeout, func() {
atomic.StoreInt32(interrupt, commitInterruptTimeout)
})
defer timer.Stop()

err := w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByTimeout) {
log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout))
}
}
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
Expand Down Expand Up @@ -1113,13 +1119,36 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start)
}

// Fill pending transactions from the txpool
// Fill pending transactions from the txpool into the block.
err = w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByNewHead) {
switch {
case err == nil:
// The entire block is filled, decrease resubmit interval in case
// of current interval is larger than the user-specified one.
w.resubmitAdjustCh <- &intervalAdjust{inc: false}

case errors.Is(err, errBlockInterruptedByRecommit):
// Notify resubmit loop to increase resubmitting interval if the
// interruption is due to frequent commits.
gaslimit := work.header.GasLimit
ratio := float64(gaslimit-work.gasPool.Gas()) / float64(gaslimit)
if ratio < 0.1 {
ratio = 0.1
}
w.resubmitAdjustCh <- &intervalAdjust{
ratio: ratio,
inc: true,
}

case errors.Is(err, errBlockInterruptedByNewHead):
// If the block building is interrupted by newhead event, discard it
// totally. Committing the interrupted block introduces unnecessary
// delay, and possibly causes miner to mine on the previous head,
// which could result in higher uncle rate.
work.discard()
return
}
// Submit the generated block for consensus sealing.
w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down Expand Up @@ -1231,3 +1260,18 @@ func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
}
return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
}

// signalToErr converts the interruption signal to a concrete error type for return.
// The given signal must be a valid interruption signal.
func signalToErr(signal int32) error {
switch signal {
case commitInterruptNewHead:
return errBlockInterruptedByNewHead
case commitInterruptResubmit:
return errBlockInterruptedByRecommit
case commitInterruptTimeout:
return errBlockInterruptedByTimeout
default:
panic(fmt.Errorf("undefined signal %d", signal))
}
}
8 changes: 4 additions & 4 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,21 +523,21 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co
}

func TestGetSealingWorkEthash(t *testing.T) {
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker(), false)
testGetSealingWork(t, ethashChainConfig, ethash.NewFaker())
}

func TestGetSealingWorkClique(t *testing.T) {
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()), false)
testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}

func TestGetSealingWorkPostMerge(t *testing.T) {
local := new(params.ChainConfig)
*local = *ethashChainConfig
local.TerminalTotalDifficulty = big.NewInt(0)
testGetSealingWork(t, local, ethash.NewFaker(), true)
testGetSealingWork(t, local, ethash.NewFaker())
}

func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, postMerge bool) {
func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) {
defer engine.Close()

w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0)
Expand Down