Skip to content

Commit

Permalink
Merge branch 'master' into nonsense/fix-double-close-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien authored Jul 20, 2021
2 parents 26a6740 + 8ddd12d commit 1510ab7
Show file tree
Hide file tree
Showing 12 changed files with 155 additions and 8 deletions.
10 changes: 10 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SplitStore struct {
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}

var _ bstore.Blockstore = (*SplitStore)(nil)
Expand Down Expand Up @@ -520,6 +523,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return nil
}

func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) {
s.mx.Lock()
defer s.mx.Unlock()

s.protectors = append(s.protectors, protector)
}

func (s *SplitStore) Close() error {
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing
Expand Down
32 changes: 32 additions & 0 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
})
}

func (s *SplitStore) applyProtectors() error {
s.mx.Lock()
defer s.mx.Unlock()

count := 0
for _, protect := range s.protectors {
err := protect(func(c cid.Cid) error {
s.trackTxnRef(c)
count++
return nil
})

if err != nil {
return xerrors.Errorf("error applynig protector: %w", err)
}
}

if count > 0 {
log.Infof("protected %d references through %d protectors", count, len(s.protectors))
}

return nil
}

// --- Compaction ---
// Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
Expand Down Expand Up @@ -392,6 +416,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are ready for concurrent marking
s.beginTxnMarking(markSet)

// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
err = s.applyProtectors()
if err != nil {
return err
}

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
Expand Down
60 changes: 54 additions & 6 deletions blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,32 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Fatal(err)
}

// create a garbage block that is protected with a rgistered protector
protected := blocks.NewBlock([]byte("protected!"))
err = hot.Put(protected)
if err != nil {
t.Fatal(err)
}

// and another one that is not protected
unprotected := blocks.NewBlock([]byte("unprotected!"))
err = hot.Put(unprotected)
if err != nil {
t.Fatal(err)
}

// open the splitstore
ss, err := Open("", ds, hot, cold, cfg)
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint

// register our protector
ss.AddProtector(func(protect func(cid.Cid) error) error {
return protect(protected.Cid())
})

err = ss.Start(chain)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -132,8 +151,8 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}

if hotCnt != 10 {
t.Errorf("expected %d blocks, but got %d", 10, hotCnt)
if hotCnt != 12 {
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
}

// trigger a compaction
Expand All @@ -146,12 +165,41 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)

if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
if coldCnt != 6 {
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt)
}

if hotCnt != 18 {
t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt)
}

// ensure our protected block is still there
has, err := hot.Has(protected.Cid())
if err != nil {
t.Fatal(err)
}

if !has {
t.Fatal("protected block is missing from hotstore")
}

// ensure our unprotected block is in the coldstore now
has, err = hot.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}

if has {
t.Fatal("unprotected block is still in hotstore")
}

has, err = cold.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}

if hotCnt != 17 {
t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt)
if !has {
t.Fatal("unprotected block is missing from coldstore")
}

// Make sure we can revert without panicking.
Expand Down
21 changes: 21 additions & 0 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
return mp, nil
}

func (mp *MessagePool) ForEachPendingMessage(f func(cid.Cid) error) error {
mp.lk.Lock()
defer mp.lk.Unlock()

for _, mset := range mp.pending {
for _, m := range mset.msgs {
err := f(m.Cid())
if err != nil {
return err
}

err = f(m.Message.Cid())
if err != nil {
return err
}
}
}

return nil
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, f := mp.keyCache[addr]
Expand Down
1 change: 1 addition & 0 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil
}

func (tma *testMpoolAPI) IsLite() bool {
return false
}
Expand Down
4 changes: 4 additions & 0 deletions extern/storage-sealing/currentdealinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (mgr *CurrentDealInfoManager) dealIDFromPublishDealsMsg(ctx context.Context
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err)
}

if lookup == nil {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: not found", publishCid)
}

if lookup.Receipt.ExitCode != exitcode.Ok {
return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode)
}
Expand Down
8 changes: 7 additions & 1 deletion extern/storage-sealing/currentdealinfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/require"
)

var errNotFound = errors.New("Could not find")
var errNotFound = errors.New("could not find")

func TestGetCurrentDealInfo(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -180,6 +180,12 @@ func TestGetCurrentDealInfo(t *testing.T) {
expectedDealID: zeroDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid),
},
"search message not found": {
publishCid: dummyCid,
targetProposal: &proposal,
expectedDealID: zeroDealID,
expectedError: xerrors.Errorf("looking for publish deal message %s: not found", dummyCid),
},
"return code not ok": {
publishCid: dummyCid,
searchMessageLookup: &MsgLookup{
Expand Down
1 change: 1 addition & 0 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
testCases := []struct {
name string
publishPeriod time.Duration
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore),
Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector),
),
If(!cfg.EnableSplitstore,
Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector),
),

Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))),
Expand Down
8 changes: 8 additions & 0 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
}

func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector {
return s.(dtypes.GCReferenceProtector)
}

func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector {
return dtypes.NoopGCReferenceProtector{}
}

func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore {
return s.(*splitstore.SplitStore).Expose()
}
Expand Down
3 changes: 2 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem)
}

func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
Expand All @@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS
return mp.Close()
},
})
protector.AddProtector(mp.ForEachPendingMessage)
return mp, nil
}

Expand Down
13 changes: 13 additions & 0 deletions node/modules/dtypes/protector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dtypes

import (
cid "github.com/ipfs/go-cid"
)

type GCReferenceProtector interface {
AddProtector(func(func(cid.Cid) error) error)
}

type NoopGCReferenceProtector struct{}

func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}

0 comments on commit 1510ab7

Please sign in to comment.