Skip to content

Commit

Permalink
Merge pull request #4301 from filecoin-project/steb/fix-two-races
Browse files Browse the repository at this point in the history
Fix two races
  • Loading branch information
magik6k authored Oct 10, 2020
2 parents 151577f + c463582 commit a491fc9
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 33 deletions.
2 changes: 1 addition & 1 deletion chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -1726,7 +1726,7 @@ func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []b
return gen.VerifyVRF(ctx, worker, rand, evrf)
}

func (syncer *Syncer) State() []SyncerState {
func (syncer *Syncer) State() []SyncerStateSnapshot {
return syncer.syncmgr.State()
}

Expand Down
15 changes: 9 additions & 6 deletions chain/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type SyncManager interface {
SetPeerHead(ctx context.Context, p peer.ID, ts *types.TipSet)

// State retrieves the state of the sync workers.
State() []SyncerState
State() []SyncerStateSnapshot
}

type syncManager struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ type syncResult struct {
const syncWorkerCount = 3

func NewSyncManager(sync SyncFunc) SyncManager {
return &syncManager{
sm := &syncManager{
bspThresh: 1,
peerHeads: make(map[peer.ID]*types.TipSet),
syncTargets: make(chan *types.TipSet),
Expand All @@ -90,6 +90,10 @@ func NewSyncManager(sync SyncFunc) SyncManager {
doSync: sync,
stop: make(chan struct{}),
}
for i := range sm.syncStates {
sm.syncStates[i] = new(SyncerState)
}
return sm
}

func (sm *syncManager) Start() {
Expand Down Expand Up @@ -128,8 +132,8 @@ func (sm *syncManager) SetPeerHead(ctx context.Context, p peer.ID, ts *types.Tip
sm.incomingTipSets <- ts
}

func (sm *syncManager) State() []SyncerState {
ret := make([]SyncerState, 0, len(sm.syncStates))
func (sm *syncManager) State() []SyncerStateSnapshot {
ret := make([]SyncerStateSnapshot, 0, len(sm.syncStates))
for _, s := range sm.syncStates {
ret = append(ret, s.Snapshot())
}
Expand Down Expand Up @@ -405,8 +409,7 @@ func (sm *syncManager) scheduleWorkSent() {
}

func (sm *syncManager) syncWorker(id int) {
ss := &SyncerState{}
sm.syncStates[id] = ss
ss := sm.syncStates[id]
for {
select {
case ts, ok := <-sm.syncTargets:
Expand Down
46 changes: 21 additions & 25 deletions chain/syncstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
)

type SyncerState struct {
lk sync.Mutex
type SyncerStateSnapshot struct {
Target *types.TipSet
Base *types.TipSet
Stage api.SyncStateStage
Expand All @@ -22,16 +21,21 @@ type SyncerState struct {
End time.Time
}

type SyncerState struct {
lk sync.Mutex
data SyncerStateSnapshot
}

func (ss *SyncerState) SetStage(v api.SyncStateStage) {
if ss == nil {
return
}

ss.lk.Lock()
defer ss.lk.Unlock()
ss.Stage = v
ss.data.Stage = v
if v == api.StageSyncComplete {
ss.End = build.Clock.Now()
ss.data.End = build.Clock.Now()
}
}

Expand All @@ -42,13 +46,13 @@ func (ss *SyncerState) Init(base, target *types.TipSet) {

ss.lk.Lock()
defer ss.lk.Unlock()
ss.Target = target
ss.Base = base
ss.Stage = api.StageHeaders
ss.Height = 0
ss.Message = ""
ss.Start = build.Clock.Now()
ss.End = time.Time{}
ss.data.Target = target
ss.data.Base = base
ss.data.Stage = api.StageHeaders
ss.data.Height = 0
ss.data.Message = ""
ss.data.Start = build.Clock.Now()
ss.data.End = time.Time{}
}

func (ss *SyncerState) SetHeight(h abi.ChainEpoch) {
Expand All @@ -58,7 +62,7 @@ func (ss *SyncerState) SetHeight(h abi.ChainEpoch) {

ss.lk.Lock()
defer ss.lk.Unlock()
ss.Height = h
ss.data.Height = h
}

func (ss *SyncerState) Error(err error) {
Expand All @@ -68,21 +72,13 @@ func (ss *SyncerState) Error(err error) {

ss.lk.Lock()
defer ss.lk.Unlock()
ss.Message = err.Error()
ss.Stage = api.StageSyncErrored
ss.End = build.Clock.Now()
ss.data.Message = err.Error()
ss.data.Stage = api.StageSyncErrored
ss.data.End = build.Clock.Now()
}

func (ss *SyncerState) Snapshot() SyncerState {
func (ss *SyncerState) Snapshot() SyncerStateSnapshot {
ss.lk.Lock()
defer ss.lk.Unlock()
return SyncerState{
Base: ss.Base,
Target: ss.Target,
Stage: ss.Stage,
Height: ss.Height,
Message: ss.Message,
Start: ss.Start,
End: ss.End,
}
return ss.data
}
4 changes: 3 additions & 1 deletion node/modules/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.
return xerrors.Errorf("failed to subscribe to event bus: %w", err)
}

ctx := helpers.LifecycleCtx(mctx, lc)

go func() {
for evt := range sub.Out() {
pic := evt.(event.EvtPeerIdentificationCompleted)
go func() {
if err := svc.SayHello(helpers.LifecycleCtx(mctx, lc), pic.Peer); err != nil {
if err := svc.SayHello(ctx, pic.Peer); err != nil {
protos, _ := h.Peerstore().GetProtocols(pic.Peer)
agent, _ := h.Peerstore().Get(pic.Peer, "AgentVersion")
if protosContains(protos, hello.ProtocolID) {
Expand Down

0 comments on commit a491fc9

Please sign in to comment.