Skip to content

Commit

Permalink
First efficient ranged-export implementation by @frisst
Browse files Browse the repository at this point in the history
This first commit contains the first and second implementation stabs (after
primary review by @hsanjuan), using a stack for task buffering.

Known issues: ctrl-c (context cancellation) results in the export code getting
deadlocked. Duplicate blocks in exports. Duplicate block reads from store.

Original commit messages:

works

works against mainnet and calibnet

feat: add internal export api method

- will hopfully make things faster by not streaming the export over the json rpc api

polish: better file nameing

fix: potential race in marking cids as seen

chore: improve logging

feat: front export with cache

fix: give hector a good channel buffer on this shit

docsgen
  • Loading branch information
frrist authored and hsanjuan committed Feb 6, 2023
1 parent 62326b2 commit 828354c
Show file tree
Hide file tree
Showing 13 changed files with 980 additions and 0 deletions.
4 changes: 4 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read

ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) (<-chan []byte, error) //perm:read

ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) error //perm:read

// ChainPrune prunes the stored chain state and garbage collects; only supported if you
// are using the splitstore
ChainPrune(ctx context.Context, opts PruneOpts) error //perm:admin
Expand Down
29 changes: 29 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,12 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error {
}
return nil
}

type ChainExportConfig struct {
WriteBufferSize int
Workers int64
CacheSize int
IncludeMessages bool
IncludeReceipts bool
IncludeStateRoots bool
}
4 changes: 4 additions & 0 deletions api/v0api/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ type FullNode interface {
// If oldmsgskip is set, messages from before the requested roots are also not included.
ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read

ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) //perm:read

ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error //perm:read

// MethodGroup: Beacon
// The Beacon method group contains methods for interacting with the random beacon (DRAND)

Expand Down
26 changes: 26 additions & 0 deletions api/v0api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions api/v0api/v0mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

118 changes: 118 additions & 0 deletions chain/store/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package store

import (
"context"
"fmt"
"sync/atomic"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"github.com/filecoin-project/lotus/blockstore"
)

type CachingBlockstore struct {
cache *lru.ARCCache
blocks blockstore.Blockstore
reads int64 // updated atomically
hits int64 // updated atomically
bytes int64 // updated atomically
}

func NewCachingBlockstore(blocks blockstore.Blockstore, cacheSize int) (*CachingBlockstore, error) {
cache, err := lru.NewARC(cacheSize)
if err != nil {
return nil, fmt.Errorf("new arc: %w", err)
}

return &CachingBlockstore{
cache: cache,
blocks: blocks,
}, nil
}

func (cs *CachingBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
return cs.blocks.DeleteBlock(ctx, c)
}

func (cs *CachingBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
return cs.blocks.GetSize(ctx, c)
}

func (cs *CachingBlockstore) Put(ctx context.Context, blk blocks.Block) error {
return cs.blocks.Put(ctx, blk)
}

func (cs *CachingBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error {
return cs.blocks.PutMany(ctx, blks)
}

func (cs *CachingBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return cs.blocks.AllKeysChan(ctx)
}

func (cs *CachingBlockstore) HashOnRead(enabled bool) {
cs.blocks.HashOnRead(enabled)
}

func (cs *CachingBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) error {
return cs.blocks.DeleteMany(ctx, cids)
}

func (cs *CachingBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%100000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}

v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return v.(blocks.Block), nil
}

blk, err := cs.blocks.Get(ctx, c)
if err != nil {
return nil, err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return blk, err
}

func (cs *CachingBlockstore) View(ctx context.Context, c cid.Cid, callback func([]byte) error) error {
reads := atomic.AddInt64(&cs.reads, 1)
if reads%1000000 == 0 {
hits := atomic.LoadInt64(&cs.hits)
by := atomic.LoadInt64(&cs.bytes)
log.Infow("CachingBlockstore stats", "reads", reads, "cache_len", cs.cache.Len(), "hit_rate", float64(hits)/float64(reads), "bytes_read", by)
}
v, hit := cs.cache.Get(c)
if hit {
atomic.AddInt64(&cs.hits, 1)
return callback(v.(blocks.Block).RawData())
}

blk, err := cs.blocks.Get(ctx, c)
if err != nil {
return err
}

atomic.AddInt64(&cs.bytes, int64(len(blk.RawData())))
cs.cache.Add(c, blk)
return callback(blk.RawData())
}

func (cs *CachingBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
atomic.AddInt64(&cs.reads, 1)
// Safe to query cache since blockstore never deletes
if cs.cache.Contains(c) {
return true, nil
}

return cs.blocks.Has(ctx, c)
}
Loading

0 comments on commit 828354c

Please sign in to comment.