From 3f165ccedcec7331617e0fc0568dbb001301e73c Mon Sep 17 00:00:00 2001 From: frrist Date: Wed, 26 Aug 2020 13:12:01 -0700 Subject: [PATCH] fix: deadlock in miner market handler --- cmd/lotus-chainwatch/processor/market.go | 6 ------ cmd/lotus-chainwatch/processor/miner.go | 13 ++----------- cmd/lotus-chainwatch/processor/processor.go | 3 --- 3 files changed, 2 insertions(+), 20 deletions(-) diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go index 426005ac31f..9577f341ed7 100644 --- a/cmd/lotus-chainwatch/processor/market.go +++ b/cmd/lotus-chainwatch/processor/market.go @@ -96,12 +96,6 @@ func (p *Processor) HandleMarketChanges(ctx context.Context, marketTips ActorTip log.Fatalw("Failed to persist market actors", "error", err) } - // we persist the dealID <--> minerID,sectorID here since the dealID needs to be stored above first - if err := p.storePreCommitDealInfo(p.sectorDealEvents); err != nil { - close(p.sectorDealEvents) - return err - } - if err := p.updateMarket(ctx, marketChanges); err != nil { log.Fatalw("Failed to update market actors", "error", err) } diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index 6e4d40decd6..b9835742a04 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -271,7 +271,6 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) preCommitEvents := make(chan *MinerSectorsEvent, 8) sectorEvents := make(chan *MinerSectorsEvent, 8) partitionEvents := make(chan *MinerSectorsEvent, 8) - p.sectorDealEvents = make(chan *SectorDealEvent, 8) grp.Go(func() error { return p.storeMinerSectorEvents(ctx, sectorEvents, preCommitEvents, partitionEvents) @@ -280,9 +279,8 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) grp.Go(func() error { defer func() { close(preCommitEvents) - close(p.sectorDealEvents) }() - return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents, p.sectorDealEvents) + return p.storeMinerPreCommitInfo(ctx, miners, preCommitEvents) }) grp.Go(func() error { @@ -298,7 +296,7 @@ func (p *Processor) persistMiners(ctx context.Context, miners []minerActorInfo) return grp.Wait() } -func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent, sectorDeals chan<- *SectorDealEvent) error { +func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerActorInfo, sectorEvents chan<- *MinerSectorsEvent) error { tx, err := p.db.Begin() if err != nil { return err @@ -334,13 +332,6 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA preCommitAdded := make([]uint64, len(changes.Added)) for i, added := range changes.Added { - if len(added.Info.DealIDs) > 0 { - sectorDeals <- &SectorDealEvent{ - MinerID: m.common.addr, - SectorID: uint64(added.Info.SectorNumber), - DealIDs: added.Info.DealIDs, - } - } if added.Info.ReplaceCapacity { if _, err := stmt.Exec( m.common.addr.String(), diff --git a/cmd/lotus-chainwatch/processor/processor.go b/cmd/lotus-chainwatch/processor/processor.go index c7540494358..e05ac255e76 100644 --- a/cmd/lotus-chainwatch/processor/processor.go +++ b/cmd/lotus-chainwatch/processor/processor.go @@ -35,9 +35,6 @@ type Processor struct { // number of blocks processed at a time batch int - - // process communication channels - sectorDealEvents chan *SectorDealEvent } type ActorTips map[types.TipSetKey][]actorInfo