Skip to content

Commit

Permalink
support out-of-chain reference protection
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed Jul 20, 2021
1 parent 1c30d07 commit ebbaf23
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 1 deletion.
10 changes: 10 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ type SplitStore struct {
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}

// registered protectors
protectors []func(func(cid.Cid) error) error
}

var _ bstore.Blockstore = (*SplitStore)(nil)
Expand Down Expand Up @@ -520,6 +523,13 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
return nil
}

func (s *SplitStore) AddProtector(protector func(func(cid.Cid) error) error) {
s.mx.Lock()
defer s.mx.Unlock()

s.protectors = append(s.protectors, protector)
}

func (s *SplitStore) Close() error {
if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) {
// already closing
Expand Down
32 changes: 32 additions & 0 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,30 @@ func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error {
})
}

func (s *SplitStore) applyProtectors() error {
s.mx.Lock()
defer s.mx.Unlock()

count := 0
for _, protect := range s.protectors {
err := protect(func(c cid.Cid) error {
s.trackTxnRef(c)
count++
return nil
})

if err != nil {
return xerrors.Errorf("error applynig protector: %w", err)
}
}

if count > 0 {
log.Infof("protected %d references through %d protectors", count, len(s.protectors))
}

return nil
}

// --- Compaction ---
// Compaction works transactionally with the following algorithm:
// - We prepare a transaction, whereby all i/o referenced objects through the API are tracked.
Expand Down Expand Up @@ -392,6 +416,14 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// we are ready for concurrent marking
s.beginTxnMarking(markSet)

// 0. track all protected references at beginning of compaction; anything added later should
// be transactionally protected by the write
log.Info("protecting references with registered protectors")
err = s.applyProtectors()
if err != nil {
return err
}

// 1. mark reachable objects by walking the chain from the current epoch; we keep state roots
// and messages until the boundary epoch.
log.Info("marking reachable objects")
Expand Down
21 changes: 21 additions & 0 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,27 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName, j journ
return mp, nil
}

func (mp *MessagePool) ProtectMessages(protect func(cid.Cid) error) error {
mp.lk.Lock()
defer mp.lk.Unlock()

for _, mset := range mp.pending {
for _, m := range mset.msgs {
err := protect(m.Cid())
if err != nil {
return err
}

err = protect(m.Message.Cid())
if err != nil {
return err
}
}
}

return nil
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
// check the cache
a, f := mp.keyCache[addr]
Expand Down
1 change: 1 addition & 0 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (tma *testMpoolAPI) SubscribeHeadChanges(cb func(rev, app []*types.TipSet)
func (tma *testMpoolAPI) PutMessage(m types.ChainMsg) (cid.Cid, error) {
return cid.Undef, nil
}

func (tma *testMpoolAPI) IsLite() bool {
return false
}
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,14 @@ func Repo(r repo.Repo) Option {
Override(new(dtypes.BasicStateBlockstore), modules.StateSplitBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.SplitBlockstore))),
Override(new(dtypes.ExposedBlockstore), modules.ExposedSplitBlockstore),
Override(new(dtypes.GCReferenceProtector), modules.SplitBlockstoreGCReferenceProtector),
),
If(!cfg.EnableSplitstore,
Override(new(dtypes.BasicChainBlockstore), modules.ChainFlatBlockstore),
Override(new(dtypes.BasicStateBlockstore), modules.StateFlatBlockstore),
Override(new(dtypes.BaseBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.ExposedBlockstore), From(new(dtypes.UniversalBlockstore))),
Override(new(dtypes.GCReferenceProtector), modules.NoopGCReferenceProtector),
),

Override(new(dtypes.ChainBlockstore), From(new(dtypes.BasicChainBlockstore))),
Expand Down
8 changes: 8 additions & 0 deletions node/modules/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked
}
}

func SplitBlockstoreGCReferenceProtector(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.GCReferenceProtector {
return s.(dtypes.GCReferenceProtector)
}

func NoopGCReferenceProtector(_ fx.Lifecycle) dtypes.GCReferenceProtector {
return dtypes.NoopGCReferenceProtector{}
}

func ExposedSplitBlockstore(_ fx.Lifecycle, s dtypes.SplitBlockstore) dtypes.ExposedBlockstore {
return s.(*splitstore.SplitStore).Expose()
}
Expand Down
3 changes: 2 additions & 1 deletion node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func ChainBlockService(bs dtypes.ExposedBlockstore, rem dtypes.ChainBitswap) dty
return blockservice.New(bs, rem)
}

func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal) (*messagepool.MessagePool, error) {
func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS, nn dtypes.NetworkName, j journal.Journal, protector dtypes.GCReferenceProtector) (*messagepool.MessagePool, error) {
mp, err := messagepool.New(mpp, ds, nn, j)
if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err)
Expand All @@ -68,6 +68,7 @@ func MessagePool(lc fx.Lifecycle, mpp messagepool.Provider, ds dtypes.MetadataDS
return mp.Close()
},
})
protector.AddProtector(mp.ProtectMessages)
return mp, nil
}

Expand Down
13 changes: 13 additions & 0 deletions node/modules/dtypes/protector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package dtypes

import (
cid "github.com/ipfs/go-cid"
)

type GCReferenceProtector interface {
AddProtector(func(func(cid.Cid) error) error)
}

type NoopGCReferenceProtector struct{}

func (p NoopGCReferenceProtector) AddProtector(func(func(cid.Cid) error) error) {}

0 comments on commit ebbaf23

Please sign in to comment.