Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add block and actor caches #766

Merged
merged 2 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/lens/lily"
"github.com/filecoin-project/lily/lens/lily/modules"
lutil "github.com/filecoin-project/lily/lens/util"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
)
Expand Down Expand Up @@ -67,6 +68,11 @@ type daemonOpts struct {

var daemonFlags daemonOpts

var cacheFlags struct {
BlockstoreCacheSize uint // number of raw blocks to cache in memory
StatestoreCacheSize uint // number of decoded actor states to cache in memory
}

var DaemonCmd = &cli.Command{
Name: "daemon",
Usage: "Start a lily daemon process.",
Expand Down Expand Up @@ -148,6 +154,18 @@ Note that jobs are not persisted between restarts of the daemon. See
EnvVars: []string{"LILY_GENESIS"},
Destination: &daemonFlags.genesis,
},
&cli.UintFlag{
Name: "blockstore-cache-size",
EnvVars: []string{"LILY_BLOCKSTORE_CACHE_SIZE"},
Value: 0,
Destination: &cacheFlags.BlockstoreCacheSize,
},
&cli.UintFlag{
Name: "statestore-cache-size",
EnvVars: []string{"LILY_STATESTORE_CACHE_SIZE"},
Value: 0,
Destination: &cacheFlags.StatestoreCacheSize,
},
},
Action: func(c *cli.Context) error {
lotuslog.SetupLogLevels()
Expand Down Expand Up @@ -234,13 +252,16 @@ Note that jobs are not persisted between restarts of the daemon. See
node.Override(new(*events.Events), modules.NewEvents),
node.Override(new(*schedule.Scheduler), schedule.NewSchedulerDaemon),
node.Override(new(*storage.Catalog), modules.NewStorageCatalog),
node.Override(new(*lutil.CacheConfig), modules.CacheConfig(cacheFlags.BlockstoreCacheSize, cacheFlags.StatestoreCacheSize)),
// End Injection

node.Override(new(dtypes.Bootstrapper), isBootstrapper),
node.Override(new(dtypes.ShutdownChan), shutdown),
node.Base(),
node.Repo(r),

node.Override(new(dtypes.UniversalBlockstore), modules.NewCachingUniversalBlockstore),

// Inject a custom StateManager, must be done after the node.Online() call as we are
// overriding the OG lotus StateManager.
node.Override(new(*stmgr.StateManager), modules.StateManager),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/fx v1.9.0
go.uber.org/fx v1.15.0
go.uber.org/zap v1.19.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gopkg.in/cheggaaa/pb.v1 v1.0.28
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1840,13 +1840,16 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/dig v1.10.0 h1:yLmDDj9/zuDjv3gz8GQGviXMs9TfysIUMUilCpgzUJY=
go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0 h1:7OAz8ucp35AU8eydejpYG7QrbE8rLKzGhHbZlJi5LYY=
go.uber.org/dig v1.12.0 h1:l1GQeZpEbss0/M4l/ZotuBndCrkMdjnygzgcuOjAdaY=
go.uber.org/dig v1.12.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.9.0/go.mod h1:mFdUyAUuJ3w4jAckiKSKbldsxy1ojpAMJ+dVZg5Y0Aw=
go.uber.org/fx v1.15.0 h1:kcfBpAm98n0ksanyyZLFE/Q3T7yPi13Ge2liu3TxR+A=
go.uber.org/fx v1.15.0/go.mod h1:jI3RazQUhGv5KkpZIRv+kuP4CcgX3fnc0qX8bLnzbx8=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down Expand Up @@ -2139,6 +2142,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7 h1:c20P3CcPbopVp2f7099WLOqSNKURf30Z0uq66HpijZY=
golang.org/x/sys v0.0.0-20210923061019-b8560ed6a9b7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand Down
22 changes: 20 additions & 2 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
var _ LilyAPI = (*LilyNodeAPI)(nil)

type LilyNodeAPI struct {
fx.In
fx.In `ignore-unexported:"true"`

net.NetAPI
full.ChainAPI
Expand All @@ -43,6 +43,9 @@ type LilyNodeAPI struct {
Scheduler *schedule.Scheduler
StorageCatalog *storage.Catalog
ExecMonitor stmgr.ExecMonitor
CacheConfig *util.CacheConfig
actorStore adt.Store
actorStoreInit sync.Once
}

func (m *LilyNodeAPI) ChainGetTipSetAfterHeight(ctx context.Context, epoch abi.ChainEpoch, key types.TipSetKey) (*types.TipSet, error) {
Expand Down Expand Up @@ -298,7 +301,22 @@ func (m *LilyNodeAPI) GetMessageExecutionsForTipSet(ctx context.Context, next *t
}

func (m *LilyNodeAPI) Store() adt.Store {
return m.ChainAPI.Chain.ActorStore(context.TODO())
m.actorStoreInit.Do(func() {
if m.CacheConfig.StatestoreCacheSize > 0 {
var err error
log.Infof("creating caching statestore with size=%d", m.CacheConfig.StatestoreCacheSize)
m.actorStore, err = util.NewCachingStateStore(m.ChainAPI.Chain.StateBlockstore(), int(m.CacheConfig.StatestoreCacheSize))
if err == nil {
return // done
}

log.Errorf("failed to create caching statestore: %v", err)
}

m.actorStore = m.ChainAPI.Chain.ActorStore(context.TODO())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be within the above else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. it creates a non-caching actor store as a fallback from 2 scenarios: there was an error returned from util.NewCachingStateStore or m.CacheConfig.StatestoreCacheSize<=0

(if no error returned from util.NewCachingStateStore then we have already returned from the function)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah whoops, I missed the return in the inner-most else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to make clearer what the flow is here

})

return m.actorStore
}

func (m *LilyNodeAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, from types.TipSetKey) (*types.MessageReceipt, error) {
Expand Down
49 changes: 49 additions & 0 deletions lens/lily/modules/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package modules

import (
"context"
"io"

"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/lens/util"
)

func CacheConfig(blockstoreCacheSize uint, statestoreCacheSize uint) func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return func(lc fx.Lifecycle, mctx helpers.MetricsCtx) (*util.CacheConfig, error) {
return &util.CacheConfig{
BlockstoreCacheSize: blockstoreCacheSize,
StatestoreCacheSize: statestoreCacheSize,
}, nil
}
}

func NewCachingUniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, cc *util.CacheConfig, r repo.LockedRepo) (dtypes.UniversalBlockstore, error) {
bs, err := r.Blockstore(helpers.LifecycleCtx(mctx, lc), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
if c, ok := bs.(io.Closer); ok {
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return c.Close()
},
})
}

if cc.BlockstoreCacheSize == 0 {
return bs, nil
}

log.Infof("creating caching blockstore with size=%d", cc.BlockstoreCacheSize)
cbs, err := util.NewCachingBlockstore(bs, int(cc.BlockstoreCacheSize))
if err != nil {
return nil, xerrors.Errorf("failed to create caching blockstore: %v", err)
}

return cbs, err
}
195 changes: 195 additions & 0 deletions lens/util/store.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
package util

import (
"bytes"
"context"
"reflect"
"sync/atomic"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/specs-actors/actors/util/adt"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)

type CacheConfig struct {
BlockstoreCacheSize uint
StatestoreCacheSize uint
}

func NewCachingStore(backing blockstore.Blockstore) *ProxyingBlockstore {
bs := blockstore.NewMemorySync()

Expand Down Expand Up @@ -115,3 +127,186 @@ func (pb *ProxyingBlockstore) GetCount() int64 {
func (pb *ProxyingBlockstore) ResetMetrics() {
atomic.StoreInt64(&pb.gets, 0)
}

var _ blockstore.Blockstore = (*CachingBlockstore)(nil)

type CachingBlockstore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
reads int64 // updated atomically
hits int64 // updated atomically
bytes int64 // updated atomically
}

func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) {
cache, err := lru.NewARC(cacheSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto comment wrt the error check here.

if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

return &CachingBlockstore{
cache: cache,
blocks: blocks,
}, nil
}

func (cs *CachingBlockstore) DeleteBlock(c cid.Cid) error {
return cs.blocks.DeleteBlock(c)
}

func (cs *CachingBlockstore) GetSize(c cid.Cid) (int, error) {
return cs.blocks.GetSize(c)
}

func (cs *CachingBlockstore) Put(blk blocks.Block) error {
return cs.blocks.Put(blk)
}

func (cs *CachingBlockstore) PutMany(blks []blocks.Block) error {
return cs.blocks.PutMany(blks)
}

func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return cs.blocks.AllKeysChan(ctx)
}

func (cs *CachingBlockstore) HashOnRead(enabled bool) {
cs.blocks.HashOnRead(enabled)
}

func (cs *CachingBlockstore) DeleteMany(cids []cid.Cid) error {
return cs.blocks.DeleteMany(cids)
}

func (cs *CachingBlockstore) Get(c cid.Cid) (blocks.Block, error) {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}

v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return v.(blocks.Block), nil
}

blk, err := cs.blocks.Get(c)
if err != nil {
return nil, err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return blk, err
}

func (cs *CachingBlockstore) View(c cid.Cid, callback func([]byte) error) error {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Debugw("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}
v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return callback(v.(blocks.Block).RawData())
}

blk, err := cs.blocks.Get(c)
if err != nil {
return err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return callback(blk.RawData())
}

func (cs *CachingBlockstore) Has(c cid.Cid) (bool, error) {
atomic.AddInt64(&cs.reads, 1)
// Safe to query cache since blockstore never deletes
if cs.cache.Contains(c) {
return true, nil
}

return cs.blocks.Has(c)
}

var _ adt.Store = (*CachingStateStore)(nil)

type CachingStateStore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
store adt.Store
reads int64 // updated atomically
hits int64 // updated atomically
}

func NewCachingStateStore(blocks blockstore.Blockstore, cacheSize int) (*CachingStateStore, error) {
cache, err := lru.NewARC(cacheSize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iirc this only errors when a negative value is passed, could panic here instead and simplify some of the calling code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's good UX to panic on user input

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and enforcement of positive integers could be done with Uint flags, but I'm alright with this as is.

if err != nil {
return nil, xerrors.Errorf("new arc: %w", err)
}

store := adt.WrapStore(context.Background(), cbor.NewCborStore(blocks))

return &CachingStateStore{
cache: cache,
store: store,
blocks: blocks,
}, nil
}

func (cas *CachingStateStore) Context() context.Context {
return context.Background()
}

func (cas *CachingStateStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
reads := atomic.AddInt64(&cas.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cas.hits)
log.Debugw("CachingStateStore stats", "reads", reads, "cache_len", cas.cache.Len(), "hit_rate", float64(hits)/float64(reads))
}

cu, ok := out.(cbg.CBORUnmarshaler)
if !ok {
return xerrors.Errorf("out parameter does not implement CBORUnmarshaler")
}

v, hit := cas.cache.Get(c)
if hit {
atomic.AddInt64(&cas.hits, 1)

o := reflect.ValueOf(out).Elem()
if !o.CanSet() {
return xerrors.Errorf("out parameter cannot be set")
}

if !v.(reflect.Value).Type().AssignableTo(o.Type()) {
return xerrors.Errorf("out parameter cannot be assigned cached value")
}

o.Set(v.(reflect.Value))
return nil
}

blk, err := cas.blocks.Get(c)
if err != nil {
return err
}

if err := cu.UnmarshalCBOR(bytes.NewReader(blk.RawData())); err != nil {
return cbor.NewSerializationError(err)
}

o := reflect.ValueOf(out).Elem()
cas.cache.Add(c, o)
return nil
}

func (cas *CachingStateStore) Put(ctx context.Context, v interface{}) (cid.Cid, error) {
return cas.store.Put(ctx, v)
}