Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

miner: discard interrupted blocks #24638

Merged
merged 6 commits into from
May 6, 2022
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ const (
staleThreshold = 7
)

var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
)

// environment is the worker's current environment and holds all
// information of the sealing block generation.
type environment struct {
Expand Down Expand Up @@ -841,7 +846,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -866,8 +871,9 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
ratio: ratio,
inc: true,
}
return errBlockInterruptedByRecommit
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
return errBlockInterruptedByNewHead
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -951,7 +957,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
return nil
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -1050,7 +1056,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) {
func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
Expand All @@ -1063,16 +1069,20 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
err := w.commitTransactions(env, txs, interrupt)
if err != nil {
Ruteri marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
err := w.commitTransactions(env, txs, interrupt)
if err != nil {
Ruteri marked this conversation as resolved.
Show resolved Hide resolved
return err
}
}

Ruteri marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1084,6 +1094,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
defer work.discard()

w.fillTransactions(nil, work)

return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

Expand Down Expand Up @@ -1113,8 +1124,14 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start)
}

// Fill pending transactions from the txpool
w.fillTransactions(interrupt, work)
err = w.fillTransactions(interrupt, work)
if err != nil && !errors.Is(err, errBlockInterruptedByRecommit) {
Ruteri marked this conversation as resolved.
Show resolved Hide resolved
work.discard()
return
}

w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down