Skip to content

Commit

Permalink
Merge pull request #5522 from filecoin-project/feat/faster-snapshot-i…
Browse files Browse the repository at this point in the history
…mport

feat: Faster snapshot imports, zstd imports
  • Loading branch information
diwufeiwen authored Dec 1, 2022
2 parents f383a51 + 79502d0 commit 04c4a18
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 5 deletions.
19 changes: 18 additions & 1 deletion cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"

"github.com/DataDog/zstd"
"github.com/filecoin-project/venus/pkg/chain"
"github.com/filecoin-project/venus/pkg/repo"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -68,15 +69,31 @@ func importChain(ctx context.Context, r repo.Repo, fname string) error {

bufr := bufio.NewReaderSize(rd, 1<<20)

header, err := bufr.Peek(4)
if err != nil {
return fmt.Errorf("peek header: %w", err)
}

bar := pb.New64(l)
br := bar.NewProxyReader(bufr)
bar.ShowTimeLeft = true
bar.ShowPercent = true
bar.ShowSpeed = true
bar.Units = pb.U_BYTES

var ir io.Reader = br
if string(header[1:]) == "\xB5\x2F\xFD" { // zstd
zr := zstd.NewReader(br)
defer func() {
if err := zr.Close(); err != nil {
log.Errorw("closing zstd reader", "error", err)
}
}()
ir = zr
}

bar.Start()
tip, err := chainStore.Import(ctx, br)
tip, err := chainStore.Import(ctx, ir)
if err != nil {
return fmt.Errorf("importing chain failed: %s", err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ require (
github.com/ipfs/go-merkledag v0.8.0
github.com/ipfs/go-unixfs v0.3.1
github.com/ipld/go-car v0.4.0
github.com/ipld/go-car/v2 v2.4.1
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p v0.22.0
github.com/libp2p/go-libp2p-kad-dht v0.18.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ github.com/ipld/go-car v0.4.0 h1:U6W7F1aKF/OJMHovnOVdst2cpQE5GhmHibQkAixgNcQ=
github.com/ipld/go-car v0.4.0/go.mod h1:Uslcn4O9cBKK9wqHm/cLTFacg6RAPv6LZx2mxd2Ypl4=
github.com/ipld/go-car/v2 v2.1.1/go.mod h1:+2Yvf0Z3wzkv7NeI69i8tuZ+ft7jyjPYIWZzeVNeFcI=
github.com/ipld/go-car/v2 v2.4.1 h1:9S+FYbQzQJ/XzsdiOV13W5Iu/i+gUnr6csbSD9laFEg=
github.com/ipld/go-car/v2 v2.4.1/go.mod h1:zjpRf0Jew9gHqSvjsKVyoq9OY9SWoEKdYCQUKVaaPT0=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-codec-dagpb v1.3.1/go.mod h1:ErNNglIi5KMur/MfFE/svtgQthzVvf+43MrzLbpcIZY=
github.com/ipld/go-codec-dagpb v1.3.2 h1:MZQUIjanHXXfDuYmtWYT8nFbqfFsZuyHClj6VDmSXr4=
Expand All @@ -851,6 +852,7 @@ github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHP
github.com/ipld/go-ipld-prime v0.17.0 h1:+U2peiA3aQsE7mrXjD2nYZaZrCcakoz2Wge8K42Ld8g=
github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73 h1:TsyATB2ZRRQGTwafJdgEUQkmjOExRV0DNokcihZxbnQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
Expand Down
49 changes: 47 additions & 2 deletions pkg/chain/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import (
"github.com/filecoin-project/pubsub"
blockstoreutil "github.com/filecoin-project/venus/venus-shared/blockstore"
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-car"
carutil "github.com/ipld/go-car/util"
carv2 "github.com/ipld/go-car/v2"
mh "github.com/multiformats/go-multihash"
"github.com/pkg/errors"
cbg "github.com/whyrusleeping/cbor-gen"
Expand Down Expand Up @@ -922,12 +924,55 @@ func (store *Store) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRece

// Import import a car file into local db
func (store *Store) Import(ctx context.Context, r io.Reader) (*types.TipSet, error) {
header, err := car.LoadCar(ctx, store.bsstore, r)
br, err := carv2.NewBlockReader(r)
if err != nil {
return nil, fmt.Errorf("loadcar failed: %w", err)
}

root, err := store.GetTipSet(ctx, types.NewTipSetKey(header.Roots...))
parallelPuts := 5
putThrottle := make(chan error, parallelPuts)
for i := 0; i < parallelPuts; i++ {
putThrottle <- nil
}

var buf []blocks.Block
for {
blk, err := br.Next()
if err != nil {
if err == io.EOF {
if len(buf) > 0 {
if err := store.bsstore.PutMany(ctx, buf); err != nil {
return nil, err
}
}

break
}
return nil, err
}

buf = append(buf, blk)

if len(buf) > 1000 {
if lastErr := <-putThrottle; lastErr != nil { // consume one error to have the right to add one
return nil, lastErr
}

go func(buf []blocks.Block) {
putThrottle <- store.bsstore.PutMany(ctx, buf)
}(buf)
buf = nil
}
}

// check errors
for i := 0; i < parallelPuts; i++ {
if lastErr := <-putThrottle; lastErr != nil {
return nil, lastErr
}
}

root, err := store.GetTipSet(ctx, types.NewTipSetKey(br.Roots...))
if err != nil {
return nil, fmt.Errorf("failed to load root tipset from chainfile: %w", err)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/state/tree/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ func LoadState(ctx context.Context, cst cbor.IpldStore, c cid.Cid) (*State, erro
return nil, fmt.Errorf("unsupported state tree version: %d", root.Version)
}
if err != nil {
stateLog.Errorf("failed to load state tree: %v %v", c, err)
return nil, fmt.Errorf("failed to load state tree: %v", err)
return nil, fmt.Errorf("failed to load state tree: %v %v", c, err)
}

s := &State{
Expand Down

0 comments on commit 04c4a18

Please sign in to comment.