Skip to content

Commit

Permalink
Add the function for detecting the orphan blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Terryhung committed Oct 2, 2023
1 parent 64c3a8f commit a6e1997
Showing 1 changed file with 76 additions and 4 deletions.
80 changes: 76 additions & 4 deletions commands/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
lotuscli "github.com/filecoin-project/lotus/cli"
cid "github.com/ipfs/go-cid"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -252,6 +254,17 @@ type syncOpts struct {

var syncFlags syncOpts

type SyncingState struct {
UnsyncedBlockHeadersByEpoch map[int64][]*blocks.UnsyncedBlockHeader
sync.Mutex
}

func (ss *SyncingState) SetBlockHeaderToMap(block *blocks.UnsyncedBlockHeader) {
ss.Mutex.Lock()
defer ss.Mutex.Unlock()
ss.UnsyncedBlockHeadersByEpoch[block.Height] = append(ss.UnsyncedBlockHeadersByEpoch[block.Height], block)
}

var SyncIncomingBlockCmd = &cli.Command{
Name: "blocks",
Usage: "Start to get incoming block",
Expand Down Expand Up @@ -302,22 +315,81 @@ var SyncIncomingBlockCmd = &cli.Command{
}
}

go getSubBlocks(ctx, lapi, strg)
state := &SyncingState{
UnsyncedBlockHeadersByEpoch: make(map[int64][]*blocks.UnsyncedBlockHeader),
}

go detectOrphanBlocks(ctx, lapi, state)
go getIncomingBlocks(ctx, lapi, strg, state)

<-ctx.Done()
return nil
},
}

func getSubBlocks(ctx context.Context, lapi lily.LilyAPI, strg model.Storage) {
sub, err := lapi.SyncIncomingBlocks(ctx)
func detectOrphanBlocks(ctx context.Context, lapi lily.LilyAPI, state *SyncingState) {
for range time.Tick(30 * time.Second) {
state.Mutex.Lock()

// Get the latestEpoch in map
latestEpoch := int64(0)
for k := range state.UnsyncedBlockHeadersByEpoch {
if k > latestEpoch {
latestEpoch = k
}
}

// Check old tipset
targetEpoch := latestEpoch - 5
oldEpoches := []int64{}
for epoch, unsyncedBlocks := range state.UnsyncedBlockHeadersByEpoch {
if epoch <= int64(targetEpoch) {
// Store the old tipset, we should clear it after checking
oldEpoches = append(oldEpoches, epoch)

oldTs, err := lapi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(epoch), types.EmptyTSK)
if err != nil {
log.Errorf("Error at getting the old tipset: %v", err)
continue
}
log.Infof("Get header cids: %v at Height: %v", latestEpoch, oldTs.Cids(), oldTs.Height())

cidMap := make(map[string]bool)
for _, cid := range oldTs.Cids() {
cidMap[cid.String()] = true
}
orphanBlocks := []*blocks.UnsyncedBlockHeader{}
for _, block := range unsyncedBlocks {
if _, exists := cidMap[block.Cid]; !exists {
orphanBlocks = append(orphanBlocks, block)
}
}

// To do set the orphan to Storage
if len(orphanBlocks) > 0 {
log.Errorf("Get Orphan blocks: %v", orphanBlocks)
}
}
}

// Clean the map
for _, epoch := range oldEpoches {
delete(state.UnsyncedBlockHeadersByEpoch, epoch)
}
state.Mutex.Unlock()
}
}

func getIncomingBlocks(ctx context.Context, lapi lily.LilyAPI, strg model.Storage, state *SyncingState) {
incomingBlocks, err := lapi.SyncIncomingBlocks(ctx)
if err != nil {
log.Error(err)
return
}

for bh := range sub {
for bh := range incomingBlocks {
block := blocks.NewUnsyncedBlockHeader(bh)
state.SetBlockHeaderToMap(block)
if strg == nil {
log.Infof("Block Height: %v, Miner: %v, Cid: %v", block.Height, block.Miner, block.Cid)
} else {
Expand Down

0 comments on commit a6e1997

Please sign in to comment.