Skip to content

Commit

Permalink
Merge pull request #9741 from filecoin-project/feat/faster-snapshot-i…
Browse files Browse the repository at this point in the history
…mport

feat: chain: Faster snapshot imports, zstd import
  • Loading branch information
magik6k authored Nov 29, 2022
2 parents c3223f4 + ac8ab3e commit 6abf738
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
52 changes: 50 additions & 2 deletions chain/store/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"context"
"io"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
mh "github.com/multiformats/go-multihash"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -55,12 +57,58 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e
// universal store. When we physically segregate the stores, we will need
// to route state objects to the state blockstore, and chain objects to
// the chain blockstore.
header, err := car.LoadCar(ctx, cs.StateBlockstore(), r)

br, err := carv2.NewBlockReader(r)
if err != nil {
return nil, xerrors.Errorf("loadcar failed: %w", err)
}

root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(header.Roots...))
s := cs.StateBlockstore()

parallelPuts := 5
putThrottle := make(chan error, parallelPuts)
for i := 0; i < parallelPuts; i++ {
putThrottle <- nil
}

var buf []blocks.Block
for {
blk, err := br.Next()
if err != nil {
if err == io.EOF {
if len(buf) > 0 {
if err := s.PutMany(ctx, buf); err != nil {
return nil, err
}
}

break
}
return nil, err
}

buf = append(buf, blk)

if len(buf) > 1000 {
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
return nil, lastErr
}

go func(buf []blocks.Block) {
putThrottle <- s.PutMany(ctx, buf)
}(buf)
buf = nil
}
}

// check errors
for i := 0; i < parallelPuts; i++ {
if lastErr := <-putThrottle; lastErr != nil {
return nil, lastErr
}
}

root, err := cs.LoadTipSet(ctx, types.NewTipSetKey(br.Roots...))
if err != nil {
return nil, xerrors.Errorf("failed to load root tipset from chainfile: %w", err)
}
Expand Down
20 changes: 19 additions & 1 deletion cmd/lotus/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"runtime/pprof"
"strings"

"github.com/DataDog/zstd"
metricsprom "github.com/ipfs/go-metrics-prometheus"
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -491,15 +492,32 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)

bufr := bufio.NewReaderSize(rd, 1<<20)

header, err := bufr.Peek(4)
if err != nil {
return xerrors.Errorf("peek header: %w", err)
}

bar := pb.New64(l)
br := bar.NewProxyReader(bufr)
bar.ShowTimeLeft = true
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES

var ir io.Reader = br

if string(header[1:]) == "\xB5\x2F\xFD" { // zstd
zr := zstd.NewReader(br)
defer func() {
if err := zr.Close(); err != nil {
log.Errorw("closing zstd reader", "error", err)
}
}()
ir = zr
}

bar.Start()
ts, err := cst.Import(ctx, br)
ts, err := cst.Import(ctx, ir)
bar.Finish()

if err != nil {
Expand Down

0 comments on commit 6abf738

Please sign in to comment.