diff --git a/api/types.go b/api/types.go index cababc743b8..63ec2b29b96 100644 --- a/api/types.go +++ b/api/types.go @@ -403,7 +403,6 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { type ChainExportConfig struct { WriteBufferSize int NumWorkers int - CacheSize int IncludeMessages bool IncludeReceipts bool IncludeStateRoots bool diff --git a/chain/store/cache.go b/chain/store/cache.go deleted file mode 100644 index 4106d9556a2..00000000000 --- a/chain/store/cache.go +++ /dev/null @@ -1,118 +0,0 @@ -package store - -import ( - "context" - "fmt" - "sync/atomic" - - lru "github.com/hashicorp/golang-lru" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - - "github.com/filecoin-project/lotus/blockstore" -) - -type CachingBlockstore struct { - cache *lru.ARCCache - blocks blockstore.Blockstore - reads int64 // updated atomically - hits int64 // updated atomically - bytes int64 // updated atomically -} - -func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) { - cache, err := lru.NewARC(cacheSize) - if err != nil { - return nil, fmt.Errorf("new arc: %w", err) - } - - return &CachingBlockstore{ - cache: cache, - blocks: blocks, - }, nil -} - -func (cs *CachingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { - return cs.blocks.DeleteBlock(ctx, c) -} - -func (cs *CachingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - return cs.blocks.GetSize(ctx, c) -} - -func (cs *CachingBlockstore) Put(ctx context.Context, blk blocks.Block) error { - return cs.blocks.Put(ctx, blk) -} - -func (cs *CachingBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error { - return cs.blocks.PutMany(ctx, blks) -} - -func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return cs.blocks.AllKeysChan(ctx) -} - -func (cs *CachingBlockstore) HashOnRead(enabled bool) { - cs.blocks.HashOnRead(enabled) -} - -func (cs *CachingBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error { - return cs.blocks.DeleteMany(ctx, cids) -} - -func (cs *CachingBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - reads := atomic.AddInt64(&cs.reads, 1) - if reads%100000 == 0 { - hits := atomic.LoadInt64(&cs.hits) - by := atomic.LoadInt64(&cs.bytes) - log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) - } - - v, hit := cs.cache.Get(c) - if hit { - atomic.AddInt64(&cs.hits, 1) - return v.(blocks.Block), nil - } - - blk, err := cs.blocks.Get(ctx, c) - if err != nil { - return nil, err - } - - atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) - cs.cache.Add(c, blk) - return blk, err -} - -func (cs *CachingBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error { - reads := atomic.AddInt64(&cs.reads, 1) - if reads%1000000 == 0 { - hits := atomic.LoadInt64(&cs.hits) - by := atomic.LoadInt64(&cs.bytes) - log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by) - } - v, hit := cs.cache.Get(c) - if hit { - atomic.AddInt64(&cs.hits, 1) - return callback(v.(blocks.Block).RawData()) - } - - blk, err := cs.blocks.Get(ctx, c) - if err != nil { - return err - } - - atomic.AddInt64(&cs.bytes, int64(len(blk.RawData()))) - cs.cache.Add(c, blk) - return callback(blk.RawData()) -} - -func (cs *CachingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { - atomic.AddInt64(&cs.reads, 1) - // Safe to query cache since blockstore never deletes - if cs.cache.Contains(c) { - return true, nil - } - - return cs.blocks.Has(ctx, c) -} diff --git a/chain/store/snapshot.go b/chain/store/snapshot.go index 4a8787e90b3..39aeca4e71e 100644 --- a/chain/store/snapshot.go +++ b/chain/store/snapshot.go @@ -508,8 +508,7 @@ func (cs *ChainStore) ExportRange( w io.Writer, head, tail *types.TipSet, messages, receipts, stateroots bool, - workers int, - cacheSize int) error { + workers int) error { h := &car.CarHeader{ Roots: head.Cids(), @@ -520,11 +519,6 @@ func (cs *ChainStore) ExportRange( return xerrors.Errorf("failed to write car header: %s", err) } - cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize) - if err != nil { - return err - } - start := time.Now() log.Infow("walking snapshot range", "head", head.Key(), @@ -544,7 +538,7 @@ func (cs *ChainStore) ExportRange( includeReceipts: receipts, } - pw, err := newWalkScheduler(ctx, cacheStore, cfg, w) + pw, err := newWalkScheduler(ctx, cs.UnionStore(), cfg, w) if err != nil { return err } diff --git a/cli/chain.go b/cli/chain.go index 9c59a7dfa44..88f511f89ab 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -1182,11 +1182,6 @@ var ChainExportRangeCmd = &cli.Command{ Usage: "specify the number of workers", Value: 1, }, - &cli.IntFlag{ - Name: "cache-size", - Usage: "specify the size of the cache (in objects) to use while exporting", - Value: 100_000, - }, &cli.IntFlag{ Name: "write-buffer", Usage: "specify write buffer size", @@ -1243,7 +1238,6 @@ var ChainExportRangeCmd = &cli.Command{ if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), NumWorkers: cctx.Int("workers"), - CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), IncludeStateRoots: cctx.Bool("stateroots"), @@ -1256,7 +1250,6 @@ var ChainExportRangeCmd = &cli.Command{ stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), NumWorkers: cctx.Int("workers"), - CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), IncludeStateRoots: cctx.Bool("stateroots"), diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 6e8b89e54f4..64bc95b1aff 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -630,7 +630,7 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types bw, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, - cfg.NumWorkers, cfg.CacheSize, + cfg.NumWorkers, ); err != nil { return fmt.Errorf("exporting chain range: %w", err) } @@ -658,7 +658,7 @@ func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetK headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, - cfg.NumWorkers, cfg.CacheSize, + cfg.NumWorkers, ) bw.Flush() //nolint:errcheck // it is a write to a pipe w.CloseWithError(err) //nolint:errcheck // it is a pipe