Skip to content

Commit

Permalink
Add caching universal blockstore and statestore
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Dec 1, 2021
1 parent 61b2ba1 commit cdc84e0
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 8 deletions.
11 changes: 10 additions & 1 deletion commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/lens/lily"
"github.com/filecoin-project/lily/lens/lily/modules"
lutil "github.com/filecoin-project/lily/lens/util"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
)
Expand Down Expand Up @@ -67,6 +68,11 @@ type daemonOpts struct {

var daemonFlags daemonOpts

var CacheFlags struct {
BlockstoreCacheSize int // number of raw blocks to cache in memory
StatestoreCacheSize int // number of decoded actor states to cache in memory
}

var DaemonCmd = &cli.Command{
Name: "daemon",
Usage: "Start a lily daemon process.",
Expand Down Expand Up @@ -229,18 +235,21 @@ Note that jobs are not persisted between restarts of the daemon. See
var api lily.LilyAPI
stop, err := node.New(ctx,
// Start Sentinel Dep injection
LilyNodeAPIOption(&api),
LilyNodeAPIOption(&api, CacheFlags.StatestoreCacheSize),
node.Override(new(*config.Conf), modules.LoadConf(daemonFlags.config)),
node.Override(new(*events.Events), modules.NewEvents),
node.Override(new(*schedule.Scheduler), schedule.NewSchedulerDaemon),
node.Override(new(*storage.Catalog), modules.NewStorageCatalog),
node.Override(new(*lutil.CacheConfig), modules.CacheConfig(CacheFlags.BlockstoreCacheSize, CacheFlags.StatestoreCacheSize)),
// End Injection

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdown),
node.Base(),
node.Repo(r),

node.Override(new(dtypes.UniversalBlockstore), modules.CachingUniversalBlockstore(CacheFlags.BlockstoreCacheSize)),

// Inject a custom StateManager, must be done after the node.Online() call as we are
// overriding the OG lotus StateManager.
node.Override(new(*stmgr.StateManager), modules.StateManager),
Expand Down
2 changes: 1 addition & 1 deletion commands/lily.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func NewSentinelNodeRPC(ctx context.Context, addr string, requestHeader http.Hea
}

// Lily Node settings for injection into lotus node.
func LilyNodeAPIOption(out *lily.LilyAPI, fopts ...node.Option) node.Option {
func LilyNodeAPIOption(out *lily.LilyAPI, statestoreCacheSize int, fopts ...node.Option) node.Option {
resAPI := &lily.LilyNodeAPI{}
opts := node.Options(
node.WithRepoType(repo.FullNode),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/fx v1.9.0
go.uber.org/fx v1.15.0
go.uber.org/zap v1.19.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/cheggaaa/pb.v1 v1.0.28
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1840,13 +1840,16 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0 h1:7OAz8ucp35AU8eydejpYG7QrbE8rLKzGhHbZlJi5LYY=
go.uber.org/dig v1.12.0 h1:l1GQeZpEbss0/M4l/ZotuBndCrkMdjnygzgcuOjAdaY=
go.uber.org/dig v1.12.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw=
go.uber.org/fx v1.15.0 h1:kcfBpAm98n0ksanyyZLFE/Q3T7yPi13Ge2liu3TxR+A=
go.uber.org/fx v1.15.0/go.mod h1:jI3RazQUhGv5KkpZIRv+kuP4CcgX3fnc0qX8bLnzbx8=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -2139,6 +2142,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7 h1:c20P3CcPbopVp2f7099WLOqSNKURf30Z0uq66HpijZY=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
24 changes: 22 additions & 2 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
var _ LilyAPI = (*LilyNodeAPI)(nil)

type LilyNodeAPI struct {
fx.In
fx.In `ignore-unexported:"true"`

net.NetAPI
full.ChainAPI
Expand All @@ -43,6 +43,9 @@ type LilyNodeAPI struct {
Scheduler *schedule.Scheduler
StorageCatalog *storage.Catalog
ExecMonitor stmgr.ExecMonitor
CacheConfig *util.CacheConfig
actorStore adt.Store
actorStoreInit sync.Once
}

func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
Expand Down Expand Up @@ -298,7 +301,24 @@ func (m *LilyNodeAPI) GetMessageExecutionsForTipSet(ctx context.Context, next *t
}

func (m *LilyNodeAPI) Store() adt.Store {
return m.ChainAPI.Chain.ActorStore(context.TODO())
m.actorStoreInit.Do(func() {
if m.CacheConfig.StatestoreCacheSize > 0 {
var err error
log.Infof("creating caching statestore with size=%d", m.CacheConfig.StatestoreCacheSize)
m.actorStore, err = util.NewCachingStateStore(m.ChainAPI.Chain.StateBlockstore(), m.CacheConfig.StatestoreCacheSize)
if err != nil {
log.Errorf("failed to create caching statestore: %v", err)
} else {
return // done
}
} else {
log.Infof("not creating caching statestore (size=%d)", m.CacheConfig.StatestoreCacheSize)
}

m.actorStore = m.ChainAPI.Chain.ActorStore(context.TODO())
})

return m.actorStore
}

func (m *LilyNodeAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, from types.TipSetKey) (*types.MessageReceipt, error) {
Expand Down
50 changes: 50 additions & 0 deletions lens/lily/modules/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package modules

import (
"context"
"io"

"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/lens/util"
)

func CacheConfig(blockstoreCacheSize int, statestoreCacheSize int) func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return &util.CacheConfig{
BlockstoreCacheSize: blockstoreCacheSize,
StatestoreCacheSize: statestoreCacheSize,
}, nil
}
}

func CachingUniversalBlockstore(cacheSize int) func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
if c, ok := bs.(io.Closer); ok {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return c.Close()
},
})
}

if cacheSize <= 0 {
return nil, xerrors.Errorf("invalid value for blockstore cache size: must be a positive integer")
}

cbs, err := util.NewCachingBlockstore(bs, cacheSize)
if err != nil {
return nil, xerrors.Errorf("failed to create caching blockstore: %v", err)
}

return cbs, err
}
}
195 changes: 195 additions & 0 deletions lens/util/store.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package util

import (
"bytes"
"context"
"reflect"
"sync/atomic"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/specs-actors/actors/util/adt"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)

type CacheConfig struct {
BlockstoreCacheSize int
StatestoreCacheSize int
}

func NewCachingStore(backing blockstore.Blockstore) *ProxyingBlockstore {
bs := blockstore.NewMemorySync()

Expand Down Expand Up @@ -115,3 +127,186 @@ func (pb *ProxyingBlockstore) GetCount() int64 {
func (pb *ProxyingBlockstore) ResetMetrics() {
atomic.StoreInt64(&pb.gets, 0)
}

var _ blockstore.Blockstore = (*CachingBlockstore)(nil)

type CachingBlockstore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
reads int64 // updated atomically
hits int64 // updated atomically
bytes int64 // updated atomically
}

func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) {
cache, err := lru.NewARC(cacheSize)
if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

return &CachingBlockstore{
cache: cache,
blocks: blocks,
}, nil
}

func (cs *CachingBlockstore) DeleteBlock(c cid.Cid) error {
return cs.blocks.DeleteBlock(c)
}

func (cs *CachingBlockstore) GetSize(c cid.Cid) (int, error) {
return cs.blocks.GetSize(c)
}

func (cs *CachingBlockstore) Put(blk blocks.Block) error {
return cs.blocks.Put(blk)
}

func (cs *CachingBlockstore) PutMany(blks []blocks.Block) error {
return cs.blocks.PutMany(blks)
}

func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return cs.blocks.AllKeysChan(ctx)
}

func (cs *CachingBlockstore) HashOnRead(enabled bool) {
cs.blocks.HashOnRead(enabled)
}

func (cs *CachingBlockstore) DeleteMany(cids []cid.Cid) error {
return cs.blocks.DeleteMany(cids)
}

func (cs *CachingBlockstore) Get(c cid.Cid) (blocks.Block, error) {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}

v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return v.(blocks.Block), nil
}

blk, err := cs.blocks.Get(c)
if err != nil {
return nil, err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return blk, err
}

func (cs *CachingBlockstore) View(c cid.Cid, callback func([]byte) error) error {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}
v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return callback(v.(blocks.Block).RawData())
}

blk, err := cs.blocks.Get(c)
if err != nil {
return err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return callback(blk.RawData())
}

func (cs *CachingBlockstore) Has(c cid.Cid) (bool, error) {
atomic.AddInt64(&cs.reads, 1)
// Safe to query cache since blockstore never deletes
if cs.cache.Contains(c) {
return true, nil
}

return cs.blocks.Has(c)
}

var _ adt.Store = (*CachingStateStore)(nil)

type CachingStateStore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
store adt.Store
reads int64 // updated atomically
hits int64 // updated atomically
}

func NewCachingStateStore(blocks blockstore.Blockstore, cacheSize int) (*CachingStateStore, error) {
cache, err := lru.NewARC(cacheSize)
if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

store := adt.WrapStore(context.Background(), cbor.NewCborStore(blocks))

return &CachingStateStore{
cache: cache,
store: store,
blocks: blocks,
}, nil
}

func (cas *CachingStateStore) Context() context.Context {
return context.Background()
}

func (cas *CachingStateStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
reads := atomic.AddInt64(&cas.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cas.hits)
log.Debugw("CachingStateStore stats", "reads", reads, "cache_len", cas.cache.Len(), "hit_rate", float64(hits)/float64(reads))
}

cu, ok := out.(cbg.CBORUnmarshaler)
if !ok {
return xerrors.Errorf("out parameter does not implement CBORUnmarshaler")
}

v, hit := cas.cache.Get(c)
if hit {
atomic.AddInt64(&cas.hits, 1)

o := reflect.ValueOf(out).Elem()
if !o.CanSet() {
return xerrors.Errorf("out parameter cannot be set")
}

if !v.(reflect.Value).Type().AssignableTo(o.Type()) {
return xerrors.Errorf("out parameter cannot be assigned cached value")
}

o.Set(v.(reflect.Value))
return nil
}

blk, err := cas.blocks.Get(c)
if err != nil {
return err
}

if err := cu.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
return cbor.NewSerializationError(err)
}

o := reflect.ValueOf(out).Elem()
cas.cache.Add(c, o)
return nil
}

func (cas *CachingStateStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cas.store.Put(ctx, v)
}
Loading

0 comments on commit cdc84e0

Please sign in to comment.