Skip to content

Commit

Permalink
Add block and actor caches (#766)
Browse files Browse the repository at this point in the history
* Add caching universal blockstore and statestore

* Address review comments
  • Loading branch information
iand authored Jan 11, 2022
1 parent 6ba705b commit df5bcfa
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 6 deletions.
21 changes: 21 additions & 0 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/lens/lily"
"github.com/filecoin-project/lily/lens/lily/modules"
lutil "github.com/filecoin-project/lily/lens/util"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
)
Expand Down Expand Up @@ -67,6 +68,11 @@ type daemonOpts struct {

var daemonFlags daemonOpts

var cacheFlags struct {
BlockstoreCacheSize uint // number of raw blocks to cache in memory
StatestoreCacheSize uint // number of decoded actor states to cache in memory
}

var DaemonCmd = &cli.Command{
Name: "daemon",
Usage: "Start a lily daemon process.",
Expand Down Expand Up @@ -148,6 +154,18 @@ Note that jobs are not persisted between restarts of the daemon. See
EnvVars: []string{"LILY_GENESIS"},
Destination: &daemonFlags.genesis,
},
&cli.UintFlag{
Name: "blockstore-cache-size",
EnvVars: []string{"LILY_BLOCKSTORE_CACHE_SIZE"},
Value: 0,
Destination: &cacheFlags.BlockstoreCacheSize,
},
&cli.UintFlag{
Name: "statestore-cache-size",
EnvVars: []string{"LILY_STATESTORE_CACHE_SIZE"},
Value: 0,
Destination: &cacheFlags.StatestoreCacheSize,
},
},
Action: func(c *cli.Context) error {
lotuslog.SetupLogLevels()
Expand Down Expand Up @@ -234,13 +252,16 @@ Note that jobs are not persisted between restarts of the daemon. See
node.Override(new(*events.Events), modules.NewEvents),
node.Override(new(*schedule.Scheduler), schedule.NewSchedulerDaemon),
node.Override(new(*storage.Catalog), modules.NewStorageCatalog),
node.Override(new(*lutil.CacheConfig), modules.CacheConfig(cacheFlags.BlockstoreCacheSize, cacheFlags.StatestoreCacheSize)),
// End Injection

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdown),
node.Base(),
node.Repo(r),

node.Override(new(dtypes.UniversalBlockstore), modules.NewCachingUniversalBlockstore),

// Inject a custom StateManager, must be done after the node.Online() call as we are
// overriding the OG lotus StateManager.
node.Override(new(*stmgr.StateManager), modules.StateManager),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/fx v1.9.0
go.uber.org/fx v1.15.0
go.uber.org/zap v1.19.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/cheggaaa/pb.v1 v1.0.28
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1840,13 +1840,16 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0 h1:7OAz8ucp35AU8eydejpYG7QrbE8rLKzGhHbZlJi5LYY=
go.uber.org/dig v1.12.0 h1:l1GQeZpEbss0/M4l/ZotuBndCrkMdjnygzgcuOjAdaY=
go.uber.org/dig v1.12.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw=
go.uber.org/fx v1.15.0 h1:kcfBpAm98n0ksanyyZLFE/Q3T7yPi13Ge2liu3TxR+A=
go.uber.org/fx v1.15.0/go.mod h1:jI3RazQUhGv5KkpZIRv+kuP4CcgX3fnc0qX8bLnzbx8=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -2139,6 +2142,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7 h1:c20P3CcPbopVp2f7099WLOqSNKURf30Z0uq66HpijZY=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
22 changes: 20 additions & 2 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
var _ LilyAPI = (*LilyNodeAPI)(nil)

type LilyNodeAPI struct {
fx.In
fx.In `ignore-unexported:"true"`

net.NetAPI
full.ChainAPI
Expand All @@ -43,6 +43,9 @@ type LilyNodeAPI struct {
Scheduler *schedule.Scheduler
StorageCatalog *storage.Catalog
ExecMonitor stmgr.ExecMonitor
CacheConfig *util.CacheConfig
actorStore adt.Store
actorStoreInit sync.Once
}

func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
Expand Down Expand Up @@ -306,7 +309,22 @@ func (m *LilyNodeAPI) GetMessageExecutionsForTipSet(ctx context.Context, next *t
}

func (m *LilyNodeAPI) Store() adt.Store {
return m.ChainAPI.Chain.ActorStore(context.TODO())
m.actorStoreInit.Do(func() {
if m.CacheConfig.StatestoreCacheSize > 0 {
var err error
log.Infof("creating caching statestore with size=%d", m.CacheConfig.StatestoreCacheSize)
m.actorStore, err = util.NewCachingStateStore(m.ChainAPI.Chain.StateBlockstore(), int(m.CacheConfig.StatestoreCacheSize))
if err == nil {
return // done
}

log.Errorf("failed to create caching statestore: %v", err)
}

m.actorStore = m.ChainAPI.Chain.ActorStore(context.TODO())
})

return m.actorStore
}

func (m *LilyNodeAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, from types.TipSetKey) (*types.MessageReceipt, error) {
Expand Down
49 changes: 49 additions & 0 deletions lens/lily/modules/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package modules

import (
"context"
"io"

"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/lens/util"
)

func CacheConfig(blockstoreCacheSize uint, statestoreCacheSize uint) func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return &util.CacheConfig{
BlockstoreCacheSize: blockstoreCacheSize,
StatestoreCacheSize: statestoreCacheSize,
}, nil
}
}

func NewCachingUniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, cc *util.CacheConfig, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
if c, ok := bs.(io.Closer); ok {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return c.Close()
},
})
}

if cc.BlockstoreCacheSize == 0 {
return bs, nil
}

log.Infof("creating caching blockstore with size=%d", cc.BlockstoreCacheSize)
cbs, err := util.NewCachingBlockstore(bs, int(cc.BlockstoreCacheSize))
if err != nil {
return nil, xerrors.Errorf("failed to create caching blockstore: %v", err)
}

return cbs, err
}
195 changes: 195 additions & 0 deletions lens/util/store.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package util

import (
"bytes"
"context"
"reflect"
"sync/atomic"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/specs-actors/actors/util/adt"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)

type CacheConfig struct {
BlockstoreCacheSize uint
StatestoreCacheSize uint
}

func NewCachingStore(backing blockstore.Blockstore) *ProxyingBlockstore {
bs := blockstore.NewMemorySync()

Expand Down Expand Up @@ -115,3 +127,186 @@ func (pb *ProxyingBlockstore) GetCount() int64 {
func (pb *ProxyingBlockstore) ResetMetrics() {
atomic.StoreInt64(&pb.gets, 0)
}

var _ blockstore.Blockstore = (*CachingBlockstore)(nil)

type CachingBlockstore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
reads int64 // updated atomically
hits int64 // updated atomically
bytes int64 // updated atomically
}

func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) {
cache, err := lru.NewARC(cacheSize)
if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

return &CachingBlockstore{
cache: cache,
blocks: blocks,
}, nil
}

func (cs *CachingBlockstore) DeleteBlock(c cid.Cid) error {
return cs.blocks.DeleteBlock(c)
}

func (cs *CachingBlockstore) GetSize(c cid.Cid) (int, error) {
return cs.blocks.GetSize(c)
}

func (cs *CachingBlockstore) Put(blk blocks.Block) error {
return cs.blocks.Put(blk)
}

func (cs *CachingBlockstore) PutMany(blks []blocks.Block) error {
return cs.blocks.PutMany(blks)
}

func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return cs.blocks.AllKeysChan(ctx)
}

func (cs *CachingBlockstore) HashOnRead(enabled bool) {
cs.blocks.HashOnRead(enabled)
}

func (cs *CachingBlockstore) DeleteMany(cids []cid.Cid) error {
return cs.blocks.DeleteMany(cids)
}

func (cs *CachingBlockstore) Get(c cid.Cid) (blocks.Block, error) {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}

v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return v.(blocks.Block), nil
}

blk, err := cs.blocks.Get(c)
if err != nil {
return nil, err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return blk, err
}

func (cs *CachingBlockstore) View(c cid.Cid, callback func([]byte) error) error {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}
v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return callback(v.(blocks.Block).RawData())
}

blk, err := cs.blocks.Get(c)
if err != nil {
return err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return callback(blk.RawData())
}

func (cs *CachingBlockstore) Has(c cid.Cid) (bool, error) {
atomic.AddInt64(&cs.reads, 1)
// Safe to query cache since blockstore never deletes
if cs.cache.Contains(c) {
return true, nil
}

return cs.blocks.Has(c)
}

var _ adt.Store = (*CachingStateStore)(nil)

type CachingStateStore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
store adt.Store
reads int64 // updated atomically
hits int64 // updated atomically
}

func NewCachingStateStore(blocks blockstore.Blockstore, cacheSize int) (*CachingStateStore, error) {
cache, err := lru.NewARC(cacheSize)
if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

store := adt.WrapStore(context.Background(), cbor.NewCborStore(blocks))

return &CachingStateStore{
cache: cache,
store: store,
blocks: blocks,
}, nil
}

func (cas *CachingStateStore) Context() context.Context {
return context.Background()
}

func (cas *CachingStateStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
reads := atomic.AddInt64(&cas.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cas.hits)
log.Debugw("CachingStateStore stats", "reads", reads, "cache_len", cas.cache.Len(), "hit_rate", float64(hits)/float64(reads))
}

cu, ok := out.(cbg.CBORUnmarshaler)
if !ok {
return xerrors.Errorf("out parameter does not implement CBORUnmarshaler")
}

v, hit := cas.cache.Get(c)
if hit {
atomic.AddInt64(&cas.hits, 1)

o := reflect.ValueOf(out).Elem()
if !o.CanSet() {
return xerrors.Errorf("out parameter cannot be set")
}

if !v.(reflect.Value).Type().AssignableTo(o.Type()) {
return xerrors.Errorf("out parameter cannot be assigned cached value")
}

o.Set(v.(reflect.Value))
return nil
}

blk, err := cas.blocks.Get(c)
if err != nil {
return err
}

if err := cu.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
return cbor.NewSerializationError(err)
}

o := reflect.ValueOf(out).Elem()
cas.cache.Add(c, o)
return nil
}

func (cas *CachingStateStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cas.store.Put(ctx, v)
}

0 comments on commit df5bcfa

Please sign in to comment.