Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitstore: add support for protecting out of chain references in the blockstore #6777

Merged
merged 4 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SplitStore struct {
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}

var _ bstore.Blockstore = (*SplitStore)(nil)
Expand Down Expand Up @@ -520,6 +523,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return nil
}

func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) {
s.mx.Lock()
defer s.mx.Unlock()

s.protectors = append(s.protectors, protector)
}

func (s *SplitStore) Close() error {
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing
Expand Down
32 changes: 32 additions & 0 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
})
}

func (s *SplitStore) applyProtectors() error {
s.mx.Lock()
defer s.mx.Unlock()

count := 0
for _, protect := range s.protectors {
err := protect(func(c cid.Cid) error {
s.trackTxnRef(c)
count++
return nil
})

if err != nil {
return xerrors.Errorf("error applynig protector: %w", err)
}
}

if count > 0 {
log.Infof("protected %d references through %d protectors", count, len(s.protectors))
}

return nil
}

// --- Compaction ---
// Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
Expand Down Expand Up @@ -392,6 +416,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are ready for concurrent marking
s.beginTxnMarking(markSet)

// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
err = s.applyProtectors()
if err != nil {
return err
}

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
Expand Down
60 changes: 54 additions & 6 deletions blockstore/splitstore/splitstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,32 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Fatal(err)
}

// create a garbage block that is protected with a rgistered protector
protected := blocks.NewBlock([]byte("protected!"))
err = hot.Put(protected)
if err != nil {
t.Fatal(err)
}

// and another one that is not protected
unprotected := blocks.NewBlock([]byte("unprotected!"))
err = hot.Put(unprotected)
if err != nil {
t.Fatal(err)
}

// open the splitstore
ss, err := Open("", ds, hot, cold, cfg)
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint

// register our protector
ss.AddProtector(func(protect func(cid.Cid) error) error {
return protect(protected.Cid())
})

err = ss.Start(chain)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -132,8 +151,8 @@ func testSplitStore(t *testing.T, cfg *Config) {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}

if hotCnt != 10 {
t.Errorf("expected %d blocks, but got %d", 10, hotCnt)
if hotCnt != 12 {
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
}

// trigger a compaction
Expand All @@ -146,12 +165,41 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)

if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
if coldCnt != 6 {
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt)
}

if hotCnt != 18 {
t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt)
}

// ensure our protected block is still there
has, err := hot.Has(protected.Cid())
if err != nil {
t.Fatal(err)
}

if !has {
t.Fatal("protected block is missing from hotstore")
}

// ensure our unprotected block is in the coldstore now
has, err = hot.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}

if has {
t.Fatal("unprotected block is still in hotstore")
}

has, err = cold.Has(unprotected.Cid())
if err != nil {
t.Fatal(err)
}

if hotCnt != 17 {
t.Errorf("expected %d hot blocks, but got %d", 17, hotCnt)
if !has {
t.Fatal("unprotected block is missing from coldstore")
}

// Make sure we can revert without panicking.
Expand Down
21 changes: 21 additions & 0 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
return mp, nil
}

func (mp *MessagePool) ProtectMessages(protect func(cid.Cid) error) error {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.Lock()
defer mp.lk.Unlock()

for _, mset := range mp.pending {
for _, m := range mset.msgs {
err := protect(m.Cid())
if err != nil {
return err
}

err = protect(m.Message.Cid())
if err != nil {
return err
}
}
}

return nil
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, f := mp.keyCache[addr]
Expand Down
1 change: 1 addition & 0 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil
}

func (tma *testMpoolAPI) IsLite() bool {
return false
}
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore),
Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector),
),
If(!cfg.EnableSplitstore,
Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector),
),

Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))),
Expand Down
8 changes: 8 additions & 0 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
}

func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector {
return s.(dtypes.GCReferenceProtector)
}

func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector {
return dtypes.NoopGCReferenceProtector{}
}

func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore {
return s.(*splitstore.SplitStore).Expose()
}
Expand Down
3 changes: 2 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem)
}

func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really a dependency, so I assume this would be a separate "invoke" step (next to the splitstore step). But I don't have strong opinions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I have strong opinions. Just not about this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it with an invoke step, but doing it here avoids all possible races I think.
No strong opinions on this either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically I want to make sure that AddProtector invocations all happen before Start in the splitstore.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that Start gets called from a hook, which runs after all invokes (IIRC).

mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
Expand All @@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS
return mp.Close()
},
})
protector.AddProtector(mp.ProtectMessages)
return mp, nil
}

Expand Down
13 changes: 13 additions & 0 deletions node/modules/dtypes/protector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dtypes

import (
cid "github.com/ipfs/go-cid"
)

type GCReferenceProtector interface {
AddProtector(func(func(cid.Cid) error) error)
}

type NoopGCReferenceProtector struct{}

func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}