From 31b894e3f0a26b44773bf36c2778dac183bdf5af Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 6 Feb 2023 12:01:05 +0100 Subject: [PATCH] Ranged-export: Remove CachingBlockstore The improvements in the range-export code lead to avoid reading most blocks twice, as well as to allowing some blocks to be written to disk multiple times. The cache hit-rate went down from being close to 50% to a maximum of 12% at the very end of the export. The reason is that most CIDs are never read twice since they are correctly tracked in the CID set. These numbers do not support the maintenance of the CachingBlockstore code. Additional testing shows that removing it has similar memory-usage behaviour and about 5 minute-faster execution (around 10%). Less code to maintain and less options to mess up with. --- api/types.go | 1 - chain/store/cache.go | 118 ---------------------------------------- chain/store/snapshot.go | 10 +--- cli/chain.go | 7 --- node/impl/full/chain.go | 4 +- 5 files changed, 4 insertions(+), 136 deletions(-) delete mode 100644 chain/store/cache.go diff --git a/api/types.go b/api/types.go index cababc743b8..63ec2b29b96 100644 --- a/api/types.go +++ b/api/types.go @@ -403,7 +403,6 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { type ChainExportConfig struct { WriteBufferSize int NumWorkers int - CacheSize int IncludeMessages bool IncludeReceipts bool IncludeStateRoots bool diff --git a/chain/store/cache.go b/chain/store/cache.go deleted file mode 100644 index 4106d9556a2..00000000000 --- a/chain/store/cache.go +++ /dev/null @@ -1,118 +0,0 @@ -package store - -import ( - "context" - "fmt" - "sync/atomic" - - lru "github.com/hashicorp/golang-lru" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/lotus/blockstore" -) - -type CachingBlockstore struct { - cache *lru.ARCCache - blocks blockstore.Blockstore - reads int64 // updated atomically - hits int64 // updated atomically - bytes int64 // updated atomically -} - -func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) { - cache, err := lru.NewARC(cacheSize) - if err != nil { - return nil, fmt.Errorf("new arc: %w", err) - } - - return &CachingBlockstore{ - cache: cache, - blocks: blocks, - }, nil -} - -func (cs *CachingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { - return cs.blocks.DeleteBlock(ctx, c) -} - -func (cs *CachingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - return cs.blocks.GetSize(ctx, c) -} - -func (cs *CachingBlockstore) Put(ctx context.Context, blk blocks.Block) error { - return cs.blocks.Put(ctx, blk) -} - -func (cs *CachingBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error { - return cs.blocks.PutMany(ctx, blks) -} - -func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return cs.blocks.AllKeysChan(ctx) -} - -func (cs *CachingBlockstore) HashOnRead(enabled bool) { - cs.blocks.HashOnRead(enabled) -} - -func (cs *CachingBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { - return cs.blocks.DeleteMany(ctx, cids) -} - -func (cs *CachingBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - reads := atomic.AddInt64(&cs.reads, 1) - if reads%100000 == 0 { - hits := atomic.LoadInt64(&cs.hits) - by := atomic.LoadInt64(&cs.bytes) - log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) - } - - v, hit := cs.cache.Get(c) - if hit { - atomic.AddInt64(&cs.hits, 1) - return v.(blocks.Block), nil - } - - blk, err := cs.blocks.Get(ctx, c) - if err != nil { - return nil, err - } - - atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) - cs.cache.Add(c, blk) - return blk, err -} - -func (cs *CachingBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { - reads := atomic.AddInt64(&cs.reads, 1) - if reads%1000000 == 0 { - hits := atomic.LoadInt64(&cs.hits) - by := atomic.LoadInt64(&cs.bytes) - log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) - } - v, hit := cs.cache.Get(c) - if hit { - atomic.AddInt64(&cs.hits, 1) - return callback(v.(blocks.Block).RawData()) - } - - blk, err := cs.blocks.Get(ctx, c) - if err != nil { - return err - } - - atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) - cs.cache.Add(c, blk) - return callback(blk.RawData()) -} - -func (cs *CachingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { - atomic.AddInt64(&cs.reads, 1) - // Safe to query cache since blockstore never deletes - if cs.cache.Contains(c) { - return true, nil - } - - return cs.blocks.Has(ctx, c) -} diff --git a/chain/store/snapshot.go b/chain/store/snapshot.go index 4a8787e90b3..39aeca4e71e 100644 --- a/chain/store/snapshot.go +++ b/chain/store/snapshot.go @@ -508,8 +508,7 @@ func (cs *ChainStore) ExportRange( w io.Writer, head, tail *types.TipSet, messages, receipts, stateroots bool, - workers int, - cacheSize int) error { + workers int) error { h := &car.CarHeader{ Roots: head.Cids(), @@ -520,11 +519,6 @@ func (cs *ChainStore) ExportRange( return xerrors.Errorf("failed to write car header: %s", err) } - cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize) - if err != nil { - return err - } - start := time.Now() log.Infow("walking snapshot range", "head", head.Key(), @@ -544,7 +538,7 @@ func (cs *ChainStore) ExportRange( includeReceipts: receipts, } - pw, err := newWalkScheduler(ctx, cacheStore, cfg, w) + pw, err := newWalkScheduler(ctx, cs.UnionStore(), cfg, w) if err != nil { return err } diff --git a/cli/chain.go b/cli/chain.go index 9c59a7dfa44..88f511f89ab 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -1182,11 +1182,6 @@ var ChainExportRangeCmd = &cli.Command{ Usage: "specify the number of workers", Value: 1, }, - &cli.IntFlag{ - Name: "cache-size", - Usage: "specify the size of the cache (in objects) to use while exporting", - Value: 100_000, - }, &cli.IntFlag{ Name: "write-buffer", Usage: "specify write buffer size", @@ -1243,7 +1238,6 @@ var ChainExportRangeCmd = &cli.Command{ if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), NumWorkers: cctx.Int("workers"), - CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), IncludeStateRoots: cctx.Bool("stateroots"), @@ -1256,7 +1250,6 @@ var ChainExportRangeCmd = &cli.Command{ stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), NumWorkers: cctx.Int("workers"), - CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), IncludeStateRoots: cctx.Bool("stateroots"), diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 6e8b89e54f4..64bc95b1aff 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -630,7 +630,7 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types bw, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, - cfg.NumWorkers, cfg.CacheSize, + cfg.NumWorkers, ); err != nil { return fmt.Errorf("exporting chain range: %w", err) } @@ -658,7 +658,7 @@ func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetK headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, - cfg.NumWorkers, cfg.CacheSize, + cfg.NumWorkers, ) bw.Flush() //nolint:errcheck // it is a write to a pipe w.CloseWithError(err) //nolint:errcheck // it is a pipe