diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index ba53feb7702..1f1ba0e992d 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -164,7 +164,7 @@ type SplitStore struct { path string mx sync.Mutex - warmupEpoch abi.ChainEpoch // protected by mx + warmupEpoch atomic.Int64 baseEpoch abi.ChainEpoch // protected by compaction lock pruneEpoch abi.ChainEpoch // protected by compaction lock @@ -684,9 +684,7 @@ func (s *SplitStore) View(ctx context.Context, cid cid.Cid, cb func([]byte) erro } func (s *SplitStore) isWarm() bool { - s.mx.Lock() - defer s.mx.Unlock() - return s.warmupEpoch > 0 + return s.warmupEpoch.Load() > 0 } // State tracking @@ -757,7 +755,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error bs, err = s.ds.Get(s.ctx, warmupEpochKey) switch err { case nil: - s.warmupEpoch = bytesToEpoch(bs) + s.warmupEpoch.Store(bytesToInt64(bs)) case dstore.ErrNotFound: warmup = true @@ -791,7 +789,7 @@ func (s *SplitStore) Start(chain ChainAccessor, us stmgr.UpgradeSchedule) error return xerrors.Errorf("error loading compaction index: %w", err) } - log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch) + log.Infow("starting splitstore", "baseEpoch", s.baseEpoch, "warmupEpoch", s.warmupEpoch.Load()) if warmup { err = s.warmup(curTs) diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index 2645c78c5b5..bdc70627116 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -145,7 +145,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { func (s *SplitStore) Info() map[string]interface{} { info := make(map[string]interface{}) info["base epoch"] = s.baseEpoch - info["warmup epoch"] = s.warmupEpoch + info["warmup epoch"] = s.warmupEpoch.Load() info["compactions"] = s.compactionIndex info["prunes"] = s.pruneIndex info["compacting"] = s.compacting == 1 diff --git a/blockstore/splitstore/splitstore_test.go b/blockstore/splitstore/splitstore_test.go index 4e168fc543c..63e77b47eaa 100644 --- a/blockstore/splitstore/splitstore_test.go +++ b/blockstore/splitstore/splitstore_test.go @@ -429,7 +429,7 @@ func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore. } defer ss.Close() //nolint - ss.warmupEpoch = 1 + ss.warmupEpoch.Store(1) go ss.reifyOrchestrator() waitForReification := func() { @@ -529,7 +529,7 @@ func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blocks } defer ss.Close() //nolint - ss.warmupEpoch = 1 + ss.warmupEpoch.Store(1) go ss.reifyOrchestrator() waitForReification := func() { diff --git a/blockstore/splitstore/splitstore_warmup.go b/blockstore/splitstore/splitstore_warmup.go index e387263dae7..7fb6f3b9d08 100644 --- a/blockstore/splitstore/splitstore_warmup.go +++ b/blockstore/splitstore/splitstore_warmup.go @@ -136,9 +136,8 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error { if err != nil { return xerrors.Errorf("error saving warm up epoch: %w", err) } - s.mx.Lock() - s.warmupEpoch = epoch - s.mx.Unlock() + + s.warmupEpoch.Store(int64(epoch)) // also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex)) diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 81a65dd06bc..50f64f903ab 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -448,12 +448,8 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra return mp, nil } -func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error { - // avoid deadlocks in splitstore compaction when something else needs to access the blockstore - // while holding the mpool lock - if !mp.lk.TryLock() { - return xerrors.Errorf("mpool TryForEachPendingMessage: could not acquire lock") - } +func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error { + mp.lk.Lock() defer mp.lk.Unlock() for _, mset := range mp.pending { diff --git a/node/modules/chain.go b/node/modules/chain.go index c4f6d644e9c..f91defc43c3 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -70,7 +70,7 @@ func MessagePool(lc fx.Lifecycle, mctx helpers.MetricsCtx, us stmgr.UpgradeSched return mp.Close() }, }) - protector.AddProtector(mp.TryForEachPendingMessage) + protector.AddProtector(mp.ForEachPendingMessage) return mp, nil }