Skip to content

Commit

Permalink
consensus, miner: add stale solution handle logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Apr 24, 2018
1 parent 97aafcd commit e0f84a4
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 32 deletions.
3 changes: 3 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,7 @@ type PoW interface {

// Hashrate returns the current mining hashrate of a PoW consensus engine.
Hashrate() float64

// Stale returns a channel for external to fetch all valid but stale solutions.
Stale() <-chan *types.Block
}
9 changes: 9 additions & 0 deletions consensus/ethash/ethash.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ type Ethash struct {
// Remote sealer related fields
workCh chan *types.Block // Notification channel to push new work to remote sealer
resultCh chan *types.Block // Channel used by mining threads to return result
staleCh chan *types.Block // Channel used by external module to fetch stale solution.
fetchWorkCh chan *sealWork // Channel used for remote sealer to fetch mining work
submitWorkCh chan *mineResult // Channel used for remote sealer to submit their mining result
fetchRateCh chan chan uint64 // Channel used to gather submitted hash rate for local or remote sealer.
Expand Down Expand Up @@ -467,6 +468,7 @@ func New(config Config) *Ethash {
hashrate: metrics.NewMeter(),
workCh: make(chan *types.Block),
resultCh: make(chan *types.Block),
staleCh: make(chan *types.Block),
fetchWorkCh: make(chan *sealWork),
submitWorkCh: make(chan *mineResult),
fetchRateCh: make(chan chan uint64),
Expand All @@ -488,6 +490,7 @@ func NewTester() *Ethash {
hashrate: metrics.NewMeter(),
workCh: make(chan *types.Block),
resultCh: make(chan *types.Block),
staleCh: make(chan *types.Block),
fetchWorkCh: make(chan *sealWork),
submitWorkCh: make(chan *mineResult),
fetchRateCh: make(chan chan uint64),
Expand Down Expand Up @@ -660,6 +663,12 @@ func (ethash *Ethash) Hashrate() float64 {
return ethash.hashrate.Rate1() + float64(total)
}

// Stale implements PoW, exposing stale solution channel for external
// to fetch all stale but valid solutions.
func (ethash *Ethash) Stale() <-chan *types.Block {
return ethash.staleCh
}

// APIs implements consensus.Engine, returning the user facing RPC APIs.
func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API {
// In order to ensure backward compatibility, we exposes ethash RPC APIs
Expand Down
125 changes: 93 additions & 32 deletions consensus/ethash/sealer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/ethereum/go-ethereum/log"
)

const (
averageBlockTime = 15 * time.Second // The average time to find the solution for mining block.
hashrateExpireTime = 10 * time.Second // The expire time for submitted hash rate.
)

var (
errNoMiningWork = errors.New("no mining work available yet, don't panic")
errInvalidSealResult = errors.New("invalid or stale proof-of-work solution")
Expand Down Expand Up @@ -161,14 +166,48 @@ search:
runtime.KeepAlive(dataset)
}

// miningWork is an environment for a mining block and holds
// all of the mining information
type miningWork struct {
block *types.Block
powHash common.Hash
seedHash common.Hash
target common.Hash
createdAt time.Time
}

// remote starts a standalone goroutine to handle remote mining related stuff.
func (ethash *Ethash) remote() {
var (
works = make(map[common.Hash]*types.Block)
rates = make(map[common.Hash]hashrate)
currentWork *types.Block
works = make(map[common.Hash]*miningWork)
rates = make(map[common.Hash]hashrate)
staleSolutions = make(map[common.Hash]*types.Block)
currentBlock *types.Block
)

// prepareWork generates a mining work for remote miner when receives a new mining block.
prepareWork := func(newBlock *types.Block) *miningWork {
block := *newBlock

// Calculate the "target" to be returned to the external sealer.
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, block.Difficulty())
n.Lsh(n, 1)
target := common.BytesToHash(n.Bytes())

work := &miningWork{
block: &block,
powHash: block.HashNoNonce(),
seedHash: common.BytesToHash(SeedHash(block.NumberU64())),
target: target,
createdAt: time.Now(),
}
// Track the mining work prepared for remote miner.
works[block.HashNoNonce()] = work
return work
}

// getWork returns a work package for external miner.
//
// The work package consists of 3 strings:
Expand All @@ -177,21 +216,15 @@ func (ethash *Ethash) remote() {
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
getWork := func() ([3]string, error) {
var res [3]string
if currentWork == nil {
if currentBlock == nil {
return res, errNoMiningWork
}
res[0] = currentWork.HashNoNonce().Hex()
res[1] = common.BytesToHash(SeedHash(currentWork.NumberU64())).Hex()

// Calculate the "target" to be returned to the external sealer.
n := big.NewInt(1)
n.Lsh(n, 255)
n.Div(n, currentWork.Difficulty())
n.Lsh(n, 1)
res[2] = common.BytesToHash(n.Bytes()).Hex()

// Trace the seal work fetched by remote sealer.
works[currentWork.HashNoNonce()] = currentWork
work := works[currentBlock.HashNoNonce()]
if work == nil {
// regenerate the mining work for current block if the previous one has been cleaned.
work = prepareWork(currentBlock)
}
res[0], res[1], res[2] = work.powHash.Hex(), work.seedHash.Hex(), work.target.Hex()
return res, nil
}

Expand All @@ -200,14 +233,14 @@ func (ethash *Ethash) remote() {
// any other error, like no pending work or stale mining result).
submitWork := func(nonce types.BlockNonce, mixDigest common.Hash, hash common.Hash) bool {
// Make sure the work submitted is present
block := works[hash]
if block == nil {
work := works[hash]
if work == nil {
log.Info("Work submitted but none pending", "hash", hash)
return false
}

// Verify the correctness of submitted result.
header := block.Header()
header := work.block.Header()
header.Nonce = nonce
header.MixDigest = mixDigest
if err := ethash.VerifySeal(nil, header); err != nil {
Expand All @@ -222,13 +255,36 @@ func (ethash *Ethash) remote() {
}

// Solutions seems to be valid, return to the miner and notify acceptance.
select {
case ethash.resultCh <- block.WithSeal(header):
delete(works, hash)
return true
default:
log.Info("Work submitted is stale", "hash", hash)
return false
block := work.block.WithSeal(header)
if hash == currentBlock.HashNoNonce() {
select {
case ethash.resultCh <- block:
default:
// CPU miner has already found the solution, save the stale solution temporarily.
staleSolutions[block.Hash()] = block
}
} else {
// Save the stale solution temporarily.
staleSolutions[block.Hash()] = block
}
delete(works, hash)
return true
}

// submitStaleSolution passes all valid but stale solutions to the stale channel.
submitStaleSolution := func() {
var submitted int
for hash, solution := range staleSolutions {
select {
case ethash.staleCh <- solution:
submitted += 1
delete(staleSolutions, hash)
default:
break
}
}
if submitted > 0 {
log.Info("submitted stale solutions", "number", submitted, "remain", len(staleSolutions))
}
}

Expand All @@ -237,15 +293,14 @@ func (ethash *Ethash) remote() {

running:
for {
submitStaleSolution()

select {
case block := <-ethash.workCh:
if currentWork != nil && block.ParentHash() != currentWork.ParentHash() {
// Start new round mining, throw out all previous work.
works = make(map[common.Hash]*types.Block)
}
// Update current work with new received block.
// Note same work can be past twice, happens when changing CPU threads.
currentWork = block
prepareWork(block)
currentBlock = block

case work := <-ethash.fetchWorkCh:
// Return current mining work to remote miner.
Expand Down Expand Up @@ -282,10 +337,16 @@ running:
case <-ticker.C:
// Clear stale submitted hash rate.
for id, rate := range rates {
if time.Since(rate.ping) > 10*time.Second {
if time.Since(rate.ping) > hashrateExpireTime {
delete(rates, id)
}
}
// Clear stale mining works
for hash, work := range works {
if time.Since(work.createdAt) > 7*averageBlockTime {
delete(works, hash)
}
}

case errCh := <-ethash.exitCh:
// Exit remote loop if ethash is closed and return relevant error.
Expand Down
43 changes: 43 additions & 0 deletions miner/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package miner
import (
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/log"
)
Expand All @@ -28,6 +30,7 @@ type CpuAgent struct {
mu sync.Mutex

workCh chan *Work
works map[common.Hash]*Work
stop chan struct{}
quitCurrentOp chan struct{}
returnCh chan<- *Result
Expand All @@ -44,6 +47,7 @@ func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent
engine: engine,
stop: make(chan struct{}, 1),
workCh: make(chan *Work, 1),
works: make(map[common.Hash]*Work),
}
return miner
}
Expand Down Expand Up @@ -72,9 +76,14 @@ func (self *CpuAgent) Start() {
return // agent already started
}
go self.update()

if _, ok := self.engine.(consensus.PoW); ok {
go self.submitStale()
}
}

func (self *CpuAgent) update() {
ticker := time.NewTicker(5 * time.Second)
out:
for {
select {
Expand All @@ -85,7 +94,20 @@ out:
}
self.quitCurrentOp = make(chan struct{})
go self.mine(work, self.quitCurrentOp)
// Track the mining work
self.works[work.Block.HashNoNonce()] = work
self.mu.Unlock()

case <-ticker.C:
// Clear out stale mining works
self.mu.Lock()
for hash, work := range self.works {
if time.Since(work.createdAt) > 10*(15*time.Second) {
delete(self.works, hash)
}
}
self.mu.Unlock()

case <-self.stop:
self.mu.Lock()
if self.quitCurrentOp != nil {
Expand All @@ -98,6 +120,27 @@ out:
}
}

// submitStale polls on the stale channel provided by PoW consensus engine,
// submits all valid but stale solution to miner.
func (self *CpuAgent) submitStale() {
pow := self.engine.(consensus.PoW)
out:
for {
select {
case solution := <-pow.Stale():
self.mu.Lock()
work := self.works[solution.HashNoNonce()]
self.mu.Unlock()
if work == nil {
continue
}
self.returnCh <- &Result{work, solution}
case <-self.stop:
break out
}
}
}

func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) {
if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil {
log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash())
Expand Down

0 comments on commit e0f84a4

Please sign in to comment.