Skip to content

Commit

Permalink
Merge pull request #10840 from filecoin-project/asr/splitstore-warm
Browse files Browse the repository at this point in the history
feat: fix deadlock in splitstore-mpool interaction
  • Loading branch information
arajasek authored May 10, 2023
2 parents 86723a3 + 52e7546 commit 298b2b4
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 19 deletions.
10 changes: 4 additions & 6 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type SplitStore struct {
path string

mx sync.Mutex
warmupEpoch abi.ChainEpoch // protected by mx
warmupEpoch atomic.Int64
baseEpoch abi.ChainEpoch // protected by compaction lock
pruneEpoch abi.ChainEpoch // protected by compaction lock

Expand Down Expand Up @@ -684,9 +684,7 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro
}

func (s *SplitStore) isWarm() bool {
s.mx.Lock()
defer s.mx.Unlock()
return s.warmupEpoch > 0
return s.warmupEpoch.Load() > 0
}

// State tracking
Expand Down Expand Up @@ -757,7 +755,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
bs, err = s.ds.Get(s.ctx, warmupEpochKey)
switch err {
case nil:
s.warmupEpoch = bytesToEpoch(bs)
s.warmupEpoch.Store(bytesToInt64(bs))

case dstore.ErrNotFound:
warmup = true
Expand Down Expand Up @@ -791,7 +789,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error
return xerrors.Errorf("error loading compaction index: %w", err)
}

log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch)
log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch.Load())

if warmup {
err = s.warmup(curTs)
Expand Down
2 changes: 1 addition & 1 deletion blockstore/splitstore/splitstore_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error {
func (s *SplitStore) Info() map[string]interface{} {
info := make(map[string]interface{})
info["base epoch"] = s.baseEpoch
info["warmup epoch"] = s.warmupEpoch
info["warmup epoch"] = s.warmupEpoch.Load()
info["compactions"] = s.compactionIndex
info["prunes"] = s.pruneIndex
info["compacting"] = s.compacting == 1
Expand Down
4 changes: 2 additions & 2 deletions blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.
}
defer ss.Close() //nolint

ss.warmupEpoch = 1
ss.warmupEpoch.Store(1)
go ss.reifyOrchestrator()

waitForReification := func() {
Expand Down Expand Up @@ -529,7 +529,7 @@ func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blocks
}
defer ss.Close() //nolint

ss.warmupEpoch = 1
ss.warmupEpoch.Store(1)
go ss.reifyOrchestrator()

waitForReification := func() {
Expand Down
5 changes: 2 additions & 3 deletions blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}
s.mx.Lock()
s.warmupEpoch = epoch
s.mx.Unlock()

s.warmupEpoch.Store(int64(epoch))

// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
Expand Down
8 changes: 2 additions & 6 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,8 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra
return mp, nil
}

func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error {
// avoid deadlocks in splitstore compaction when something else needs to access the blockstore
// while holding the mpool lock
if !mp.lk.TryLock() {
return xerrors.Errorf("mpool TryForEachPendingMessage: could not acquire lock")
}
func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error {
mp.lk.Lock()
defer mp.lk.Unlock()

for _, mset := range mp.pending {
Expand Down
2 changes: 1 addition & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func MessagePool(lc fx.Lifecycle, mctx helpers.MetricsCtx, us stmgr.UpgradeSched
return mp.Close()
},
})
protector.AddProtector(mp.TryForEachPendingMessage)
protector.AddProtector(mp.ForEachPendingMessage)
return mp, nil
}

Expand Down

0 comments on commit 298b2b4

Please sign in to comment.