diff --git a/api/api_full.go b/api/api_full.go index 311ebbfed72..8571acc035e 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -172,9 +172,9 @@ type FullNode interface { // If oldmsgskip is set, messages from before the requested roots are also not included. ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read - ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) (<-chan []byte, error) //perm:read + ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) (<-chan []byte, error) //perm:read - ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *ChainExportConfig) error //perm:read + ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg ChainExportConfig) error //perm:read // ChainPrune prunes the stored chain state and garbage collects; only supported if you // are using the splitstore diff --git a/api/proxy_gen.go b/api/proxy_gen.go index c351a4fabc9..a6b3d9a18d7 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -7,15 +7,6 @@ import ( "encoding/json" "time" - "github.com/google/uuid" - "github.com/ipfs/go-cid" - blocks "github.com/ipfs/go-libipfs/blocks" - "github.com/libp2p/go-libp2p/core/metrics" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -31,7 +22,6 @@ import ( "github.com/filecoin-project/go-state-types/dline" abinetwork "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/proof" - apitypes "github.com/filecoin-project/lotus/api/types" builtinactors "github.com/filecoin-project/lotus/chain/actors/builtin" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -44,6 +34,14 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/fsutil" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + blocks "github.com/ipfs/go-libipfs/blocks" + "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "golang.org/x/xerrors" ) var ErrNotSupported = xerrors.New("method not supported") @@ -119,9 +117,9 @@ type FullNodeStruct struct { ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"` - ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) `perm:"read"` + ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) `perm:"read"` - ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error `perm:"read"` + ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error `perm:"read"` ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"` @@ -1388,25 +1386,25 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo return nil, ErrNotSupported } -func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) { +func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) { if s.Internal.ChainExportRange == nil { return nil, ErrNotSupported } return s.Internal.ChainExportRange(p0, p1, p2, p3) } -func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) (<-chan []byte, error) { +func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) (<-chan []byte, error) { return nil, ErrNotSupported } -func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error { +func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error { if s.Internal.ChainExportRangeInternal == nil { return ErrNotSupported } return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3) } -func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *ChainExportConfig) error { +func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 ChainExportConfig) error { return ErrNotSupported } diff --git a/api/types.go b/api/types.go index d8495d29efc..cababc743b8 100644 --- a/api/types.go +++ b/api/types.go @@ -399,9 +399,10 @@ func (m *MsgUuidMapType) UnmarshalJSON(b []byte) error { return nil } +// ChainExportConfig holds configuration for chain ranged exports. type ChainExportConfig struct { WriteBufferSize int - Workers int64 + NumWorkers int CacheSize int IncludeMessages bool IncludeReceipts bool diff --git a/api/v0api/full.go b/api/v0api/full.go index 648b8b12e3f..9ed6f7666a7 100644 --- a/api/v0api/full.go +++ b/api/v0api/full.go @@ -161,9 +161,9 @@ type FullNode interface { // If oldmsgskip is set, messages from before the requested roots are also not included. ChainExport(ctx context.Context, nroots abi.ChainEpoch, oldmsgskip bool, tsk types.TipSetKey) (<-chan []byte, error) //perm:read - ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) //perm:read + ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) //perm:read - ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error //perm:read + ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error //perm:read // MethodGroup: Beacon // The Beacon method group contains methods for interacting with the random beacon (DRAND) diff --git a/api/v0api/proxy_gen.go b/api/v0api/proxy_gen.go index f5ea8455e68..a6c0920412d 100644 --- a/api/v0api/proxy_gen.go +++ b/api/v0api/proxy_gen.go @@ -5,11 +5,6 @@ package v0api import ( "context" - "github.com/ipfs/go-cid" - blocks "github.com/ipfs/go-libipfs/blocks" - "github.com/libp2p/go-libp2p/core/peer" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -22,7 +17,6 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" abinetwork "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/api" apitypes "github.com/filecoin-project/lotus/api/types" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" @@ -30,6 +24,10 @@ import ( marketevents "github.com/filecoin-project/lotus/markets/loggers" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/imports" + "github.com/ipfs/go-cid" + blocks "github.com/ipfs/go-libipfs/blocks" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/xerrors" ) var ErrNotSupported = xerrors.New("method not supported") @@ -46,9 +44,9 @@ type FullNodeStruct struct { ChainExport func(p0 context.Context, p1 abi.ChainEpoch, p2 bool, p3 types.TipSetKey) (<-chan []byte, error) `perm:"read"` - ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) `perm:"read"` + ChainExportRange func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) `perm:"read"` - ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error `perm:"read"` + ChainExportRangeInternal func(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error `perm:"read"` ChainGetBlock func(p0 context.Context, p1 cid.Cid) (*types.BlockHeader, error) `perm:"read"` @@ -532,25 +530,25 @@ func (s *FullNodeStub) ChainExport(p0 context.Context, p1 abi.ChainEpoch, p2 boo return nil, ErrNotSupported } -func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) { +func (s *FullNodeStruct) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) { if s.Internal.ChainExportRange == nil { return nil, ErrNotSupported } return s.Internal.ChainExportRange(p0, p1, p2, p3) } -func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) (<-chan []byte, error) { +func (s *FullNodeStub) ChainExportRange(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) (<-chan []byte, error) { return nil, ErrNotSupported } -func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error { +func (s *FullNodeStruct) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error { if s.Internal.ChainExportRangeInternal == nil { return ErrNotSupported } return s.Internal.ChainExportRangeInternal(p0, p1, p2, p3) } -func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 *api.ChainExportConfig) error { +func (s *FullNodeStub) ChainExportRangeInternal(p0 context.Context, p1 types.TipSetKey, p2 types.TipSetKey, p3 api.ChainExportConfig) error { return ErrNotSupported } diff --git a/chain/store/snapshot.go b/chain/store/snapshot.go index eaaf921028f..4a8787e90b3 100644 --- a/chain/store/snapshot.go +++ b/chain/store/snapshot.go @@ -15,6 +15,7 @@ import ( carv2 "github.com/ipld/go-car/v2" mh "github.com/multiformats/go-multihash" cbg "github.com/whyrusleeping/cbor-gen" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" @@ -32,35 +33,6 @@ func (cs *ChainStore) UnionStore() bstore.Blockstore { return bstore.Union(cs.stateBlockstore, cs.chainBlockstore) } -func (cs *ChainStore) ExportRange(ctx context.Context, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cacheSize int, w io.Writer) error { - h := &car.CarHeader{ - Roots: head.Cids(), - Version: 1, - } - - if err := car.WriteHeader(h, w); err != nil { - return xerrors.Errorf("failed to write car header: %s", err) - } - - cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize) - if err != nil { - return err - } - return cs.WalkSnapshotRange(ctx, cacheStore, head, tail, messages, receipts, stateroots, workers, func(c cid.Cid) error { - blk, err := cacheStore.Get(ctx, c) - if err != nil { - return xerrors.Errorf("writing object to car, bs.Get: %w", err) - } - - if err := carutil.LdWrite(w, c.Bytes(), blk.RawData()); err != nil { - return xerrors.Errorf("failed to write block to car output: %w", err) - } - - return nil - }) - -} - func (cs *ChainStore) Export(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs bool, w io.Writer) error { h := &car.CarHeader{ Roots: ts.Cids(), @@ -165,67 +137,228 @@ func (cs *ChainStore) Import(ctx context.Context, r io.Reader) (*types.TipSet, e return root, nil } +type walkSchedTaskType int + +const ( + finishTask walkSchedTaskType = -1 + blockTask walkSchedTaskType = iota + messageTask + receiptTask + stateTask + dagTask +) + +func (t walkSchedTaskType) String() string { + switch t { + case finishTask: + return "finish" + case blockTask: + return "block" + case messageTask: + return "message" + case receiptTask: + return "receipt" + case stateTask: + return "state" + case dagTask: + return "dag" + } + panic(fmt.Sprintf("unknow task %d", t)) +} + type walkTask struct { c cid.Cid - taskType taskType + taskType walkSchedTaskType } -type walkResult struct { +// an ever growing FIFO +type taskFifo struct { + in chan walkTask + out chan walkTask + fifo []walkTask +} + +type taskResult struct { c cid.Cid + b blocks.Block +} + +func newTaskFifo(bufferLen int) *taskFifo { + f := taskFifo{ + in: make(chan walkTask, bufferLen), + out: make(chan walkTask, bufferLen), + fifo: make([]walkTask, 0), + } + + go f.run() + + return &f +} + +func (f *taskFifo) Close() error { + close(f.in) + return nil +} + +func (f *taskFifo) run() { + for { + if len(f.fifo) > 0 { + // we have items in slice + // try to put next out or read something in. + // blocks if nothing works. + next := f.fifo[0] + select { + case f.out <- next: + f.fifo = f.fifo[1:] + case elem, ok := <-f.in: + if !ok { + // drain and close out. + for _, elem := range f.fifo { + f.out <- elem + } + close(f.out) + return + } + f.fifo = append(f.fifo, elem) + } + } else { + // no elements in fifo to put out. + // Try to read in and block. + // When done, try to put out or add to fifo. + select { + case elem, ok := <-f.in: + if !ok { + close(f.out) + return + } + select { + case f.out <- elem: + default: + f.fifo = append(f.fifo, elem) + } + } + } + } } type walkSchedulerConfig struct { - numWorkers int64 - tail *types.TipSet + numWorkers int + + head *types.TipSet // Tipset to start walking from. + tail *types.TipSet // Tipset to end at. includeMessages bool includeReceipts bool includeState bool } -func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg *walkSchedulerConfig, rootTasks ...*walkTask) (*walkScheduler, context.Context) { - tailSet := cid.NewSet() - for i := range cfg.tail.Cids() { - tailSet.Add(cfg.tail.Cids()[i]) - } - grp, ctx := errgroup.WithContext(ctx) - s := &walkScheduler{ - store: store, - numWorkers: cfg.numWorkers, - stack: rootTasks, - in: make(chan *walkTask, cfg.numWorkers*64), - out: make(chan *walkTask, cfg.numWorkers*64), - grp: grp, - tail: tailSet, - cfg: cfg, - } - s.taskWg.Add(len(rootTasks)) - return s, ctx -} - type walkScheduler struct { - store bstore.Blockstore - // number of worker routine to spawn - numWorkers int64 - // buffer holds tasks until they are processed - stack []*walkTask - // inbound and outbound tasks - in, out chan *walkTask + ctx context.Context + cancel context.CancelFunc + + store bstore.Blockstore + cfg walkSchedulerConfig + writer io.Writer + + workerTasks *taskFifo + totalTasks atomic.Int64 + results chan taskResult + writeErrorChan chan error + // tracks number of inflight tasks - taskWg sync.WaitGroup + //taskWg sync.WaitGroup // launches workers and collects errors if any occur - grp *errgroup.Group - // set of tasks seen + workers *errgroup.Group + // set of CIDs already exported seen sync.Map +} + +func newWalkScheduler(ctx context.Context, store bstore.Blockstore, cfg walkSchedulerConfig, w io.Writer) (*walkScheduler, error) { + ctx, cancel := context.WithCancel(ctx) + workers, ctx := errgroup.WithContext(ctx) + s := &walkScheduler{ + ctx: ctx, + cancel: cancel, + store: store, + cfg: cfg, + writer: w, + results: make(chan taskResult, cfg.numWorkers*64), + workerTasks: newTaskFifo(cfg.numWorkers * 64), + writeErrorChan: make(chan error, 1), + workers: workers, + } + + go func() { + defer close(s.writeErrorChan) + for r := range s.results { + // Write + if err := carutil.LdWrite(s.writer, r.c.Bytes(), r.b.RawData()); err != nil { + // abort operations + cancel() + s.writeErrorChan <- err + } + } + }() + + // workers + for i := 0; i < cfg.numWorkers; i++ { + f := func(n int) func() error { + return func() error { + return s.workerFunc(n) + } + }(i) + s.workers.Go(f) + } + + s.totalTasks.Add(int64(len(cfg.head.Blocks()))) + for _, b := range cfg.head.Blocks() { + select { + case <-ctx.Done(): + log.Errorw("context done while sending root tasks", ctx.Err()) + cancel() // kill workers + return nil, ctx.Err() + case s.workerTasks.in <- walkTask{ + c: b.Cid(), + taskType: blockTask, + }: + } + } - tail *cid.Set - cfg *walkSchedulerConfig + return s, nil } func (s *walkScheduler) Wait() error { - return s.grp.Wait() + err := s.workers.Wait() + // all workers done. One would have reached genesis and notified the + // rest to exit. Yet, there might be some pending tasks in the queue, + // so we need to run a "single worker". + if err != nil { + log.Errorw("export workers finished with error", "error", err) + } + + for { + if n := s.totalTasks.Load(); n == 0 { + break // finally fully done + } + select { + case task := <-s.workerTasks.out: + s.totalTasks.Add(-1) + if err != nil { + continue // just drain if errors happened. + } + err = s.processTask(task, 0) + } + } + close(s.results) + errWrite := <-s.writeErrorChan + if errWrite != nil { + log.Errorw("error writing to CAR file", "error", err) + return errWrite + } + s.workerTasks.Close() + return err } -func (s *walkScheduler) enqueueIfNew(task *walkTask) { +func (s *walkScheduler) enqueueIfNew(task walkTask) { if task.c.Prefix().MhType == mh.IDENTITY { //log.Infow("ignored", "cid", todo.c.String()) return @@ -234,118 +367,68 @@ func (s *walkScheduler) enqueueIfNew(task *walkTask) { //log.Infow("ignored", "cid", todo.c.String()) return } - if _, ok := s.seen.Load(task.c); ok { + if _, loaded := s.seen.LoadOrStore(task.c, struct{}{}); loaded { + // we already had it on the map return } + log.Debugw("enqueue", "type", task.taskType.String(), "cid", task.c.String()) - s.taskWg.Add(1) - s.seen.Store(task.c, struct{}{}) - s.in <- task + s.totalTasks.Add(1) + s.workerTasks.in <- task } -func (s *walkScheduler) startScheduler(ctx context.Context) { - s.grp.Go(func() error { - defer func() { - log.Infow("walkScheduler shutting down") - close(s.out) - - // Because the workers may have exited early (due to the context being canceled). - for range s.out { - s.taskWg.Done() - } - log.Info("closed scheduler out wait group") +func (s *walkScheduler) sendFinish(workerN int) error { + log.Infow("worker finished work", "worker", workerN) + s.totalTasks.Add(1) + s.workerTasks.in <- walkTask{ + taskType: finishTask, + } + return nil +} - // Because the workers may have enqueued additional tasks. - for range s.in { - s.taskWg.Done() - } - log.Info("closed scheduler in wait group") - - // now, the waitgroup should be at 0, and the goroutine that was _waiting_ on it should have exited. - log.Infow("walkScheduler stopped") - }() - go func() { - s.taskWg.Wait() - close(s.in) - log.Info("closed scheduler in channel") - }() - for { - if n := len(s.stack) - 1; n >= 0 { - select { - case <-ctx.Done(): - return ctx.Err() - case newJob, ok := <-s.in: - if !ok { - return nil - } - s.stack = append(s.stack, newJob) - case s.out <- s.stack[n]: - s.stack[n] = nil - s.stack = s.stack[:n] - } - } else { - select { - case <-ctx.Done(): - return ctx.Err() - case newJob, ok := <-s.in: - if !ok { - return nil - } - s.stack = append(s.stack, newJob) - } +func (s *walkScheduler) workerFunc(workerN int) error { + log.Infow("starting worker", "worker", workerN) + for t := range s.workerTasks.out { + s.totalTasks.Add(-1) + select { + case <-s.ctx.Done(): + return s.ctx.Err() + default: + // A worker reached genesis, so we wind down and let others do + // the same. Exit. + if t.taskType == finishTask { + return s.sendFinish(workerN) } } - }) -} -func (s *walkScheduler) startWorkers(ctx context.Context, out chan *walkResult) { - for i := int64(0); i < s.numWorkers; i++ { - s.grp.Go(func() error { - for task := range s.out { - if err := s.work(ctx, task, out); err != nil { - return err - } - } - return nil - }) + err := s.processTask(t, workerN) + if err != nil { + return err + } + // continue } + return nil } -type taskType int - -func (t taskType) String() string { - switch t { - case Block: - return "block" - case Message: - return "message" - case Receipt: - return "receipt" - case State: - return "state" - case Dag: - return "dag" +func (s *walkScheduler) processTask(t walkTask, workerN int) error { + if t.taskType == finishTask { + return nil } - panic(fmt.Sprintf("unknow task %d", t)) -} -const ( - Block taskType = iota - Message - Receipt - State - Dag -) + blk, err := s.store.Get(s.ctx, t.c) + if err != nil { + return xerrors.Errorf("writing object to car, bs.Get: %w", err) + } -func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan *walkResult) error { - defer s.taskWg.Done() - // unseen cid, its a result - results <- &walkResult{c: todo.c} + s.results <- taskResult{ + c: t.c, + b: blk, + } // extract relevant dags to walk from the block - if todo.taskType == Block { - blk := todo.c - data, err := s.store.Get(ctx, blk) + if t.taskType == blockTask { + blk := t.c + data, err := s.store.Get(s.ctx, blk) if err != nil { return err } @@ -359,128 +442,122 @@ func (s *walkScheduler) work(ctx context.Context, todo *walkTask, results chan * if b.Height == 0 { log.Info("exporting genesis block") for i := range b.Parents { - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.Parents[i], - taskType: Dag, + taskType: dagTask, }) } - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.ParentStateRoot, - taskType: State, + taskType: stateTask, }) - return nil + + return s.sendFinish(workerN) } // enqueue block parents for i := range b.Parents { - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.Parents[i], - taskType: Block, + taskType: blockTask, }) } if s.cfg.tail.Height() >= b.Height { - log.Debugw("tail reached", "cid", blk.String()) + log.Debugw("tail reached: only blocks will be exported from now until genesis", "cid", blk.String()) return nil } if s.cfg.includeMessages { // enqueue block messages - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.Messages, - taskType: Message, + taskType: messageTask, }) } if s.cfg.includeReceipts { // enqueue block receipts - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.ParentMessageReceipts, - taskType: Receipt, + taskType: receiptTask, }) } if s.cfg.includeState { - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: b.ParentStateRoot, - taskType: State, + taskType: stateTask, }) } return nil } - data, err := s.store.Get(ctx, todo.c) - if err != nil { - return err - } - return cbg.ScanForLinks(bytes.NewReader(data.RawData()), func(c cid.Cid) { - if todo.c.Prefix().Codec != cid.DagCBOR || todo.c.Prefix().MhType == mh.IDENTITY { + + // Not a chain-block: we scan for CIDs in the raw block-data + return cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) { + if t.c.Prefix().Codec != cid.DagCBOR || t.c.Prefix().MhType == mh.IDENTITY { return } - s.enqueueIfNew(&walkTask{ + s.enqueueIfNew(walkTask{ c: c, - taskType: Dag, + taskType: dagTask, }) }) } -func (cs *ChainStore) WalkSnapshotRange(ctx context.Context, store bstore.Blockstore, head, tail *types.TipSet, messages, receipts, stateroots bool, workers int64, cb func(cid.Cid) error) error { - start := time.Now() - log.Infow("walking snapshot range", "head", head.Key(), "tail", tail.Key(), "messages", messages, "receipts", receipts, "stateroots", stateroots, "workers", workers, "start", start) - var tasks []*walkTask - for i := range head.Blocks() { - tasks = append(tasks, &walkTask{ - c: head.Blocks()[i].Cid(), - taskType: 0, - }) +func (cs *ChainStore) ExportRange( + ctx context.Context, + w io.Writer, + head, tail *types.TipSet, + messages, receipts, stateroots bool, + workers int, + cacheSize int) error { + + h := &car.CarHeader{ + Roots: head.Cids(), + Version: 1, + } + + if err := car.WriteHeader(h, w); err != nil { + return xerrors.Errorf("failed to write car header: %s", err) } - cfg := &walkSchedulerConfig{ + cacheStore, err := NewCachingBlockstore(cs.UnionStore(), cacheSize) + if err != nil { + return err + } + + start := time.Now() + log.Infow("walking snapshot range", + "head", head.Key(), + "tail", tail.Key(), + "messages", messages, + "receipts", receipts, + "stateroots", + stateroots, + "workers", workers) + + cfg := walkSchedulerConfig{ numWorkers: workers, + head: head, tail: tail, includeMessages: messages, includeState: stateroots, includeReceipts: receipts, } - pw, ctx := newWalkScheduler(ctx, store, cfg, tasks...) - // create a buffered channel for exported CID's scaled on the number of workers. - results := make(chan *walkResult, workers*64) - - pw.startScheduler(ctx) - // workers accept channel and write results to it. - pw.startWorkers(ctx, results) + pw, err := newWalkScheduler(ctx, cacheStore, cfg, w) + if err != nil { + return err + } - // used to wait until result channel has been drained. - resultsDone := make(chan struct{}) - var cbErr error - go func() { - // signal we are done draining results when this method exits. - defer close(resultsDone) - // drain the results channel until is closes. - for res := range results { - if err := cb(res.c); err != nil { - log.Errorw("export range callback error", "error", err) - cbErr = err - return - } - } - }() // wait until all workers are done. - err := pw.Wait() + err = pw.Wait() if err != nil { log.Errorw("walker scheduler", "error", err) + return err } - // workers are done, close the results channel. - close(results) - // wait until all results have been consumed before exiting (its buffered). - <-resultsDone - // if there was a callback error return it. - if cbErr != nil { - return cbErr - } log.Infow("walking snapshot range complete", "duration", time.Since(start), "success", err == nil) - - // return any error encountered by the walker. - return err + return nil } func (cs *ChainStore) WalkSnapshot(ctx context.Context, ts *types.TipSet, inclRecentRoots abi.ChainEpoch, skipOldMsgs, skipMsgReceipts bool, cb func(cid.Cid) error) error { diff --git a/cli/chain.go b/cli/chain.go index e0cc62ac953..9c59a7dfa44 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "os" @@ -1153,12 +1154,12 @@ var ChainExportRangeCmd = &cli.Command{ Flags: []cli.Flag{ &cli.StringFlag{ Name: "head", - Usage: "specify tipset to start the export from", + Usage: "specify tipset to start the export from (higher epoch)", Value: "@head", }, &cli.StringFlag{ Name: "tail", - Usage: "specify tipset to end the export at", + Usage: "specify tipset to end the export at (lower epoch)", Value: "@tail", }, &cli.BoolFlag{ @@ -1176,7 +1177,7 @@ var ChainExportRangeCmd = &cli.Command{ Usage: "specify if stateroots should be include", Value: false, }, - &cli.Int64Flag{ + &cli.IntFlag{ Name: "workers", Usage: "specify the number of workers", Value: 1, @@ -1234,10 +1235,14 @@ var ChainExportRangeCmd = &cli.Command{ } } + if head.Height() < tail.Height() { + return errors.New("Height of --head tipset must be greater or equal to the height of the --tail tipset") + } + if cctx.Bool("internal") { - if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{ + if err := api.ChainExportRangeInternal(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), - Workers: cctx.Int64("workers"), + NumWorkers: cctx.Int("workers"), CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), @@ -1248,9 +1253,9 @@ var ChainExportRangeCmd = &cli.Command{ return nil } - stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), &lapi.ChainExportConfig{ + stream, err := api.ChainExportRange(ctx, head.Key(), tail.Key(), lapi.ChainExportConfig{ WriteBufferSize: cctx.Int("write-buffer"), - Workers: cctx.Int64("workers"), + NumWorkers: cctx.Int("workers"), CacheSize: cctx.Int("cache-size"), IncludeMessages: cctx.Bool("messages"), IncludeReceipts: cctx.Bool("receipts"), diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index 6d47518ee42..6e8b89e54f4 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -9,6 +9,7 @@ import ( "io" "math" "os" + "path/filepath" "strconv" "strings" "sync" @@ -592,7 +593,7 @@ func (m *ChainModule) ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.M return cm.VMMessage(), nil } -func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) error { +func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) error { headTs, err := a.Chain.GetTipSetFromKey(ctx, head) if err != nil { return xerrors.Errorf("loading tipset %s: %w", head, err) @@ -601,10 +602,18 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types if err != nil { return xerrors.Errorf("loading tipset %s: %w", tail, err) } - f, err := os.Create(fmt.Sprintf("./snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix())) + fileName := fmt.Sprintf("./snapshot_%d_%d_%d.car", tailTs.Height(), headTs.Height(), time.Now().Unix()) + absFileName, err := filepath.Abs(fileName) if err != nil { return err } + + f, err := os.Create(fileName) + if err != nil { + return err + } + + log.Infow("Exporting chain range", "path", absFileName) // buffer writes to the chain export file. bw := bufio.NewWriterSize(f, cfg.WriteBufferSize) @@ -617,14 +626,20 @@ func (a ChainAPI) ChainExportRangeInternal(ctx context.Context, head, tail types } }() - if err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw); err != nil { + if err := a.Chain.ExportRange(ctx, + bw, + headTs, tailTs, + cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, + cfg.NumWorkers, cfg.CacheSize, + ); err != nil { return fmt.Errorf("exporting chain range: %w", err) } + // FIXME: return progress. return nil } -func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg *api.ChainExportConfig) (<-chan []byte, error) { +func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetKey, cfg api.ChainExportConfig) (<-chan []byte, error) { headTs, err := a.Chain.GetTipSetFromKey(ctx, head) if err != nil { return nil, xerrors.Errorf("loading tipset %s: %w", head, err) @@ -637,8 +652,14 @@ func (a ChainAPI) ChainExportRange(ctx context.Context, head, tail types.TipSetK out := make(chan []byte) go func() { bw := bufio.NewWriterSize(w, cfg.WriteBufferSize) - - err := a.Chain.ExportRange(ctx, headTs, tailTs, cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, cfg.Workers, cfg.CacheSize, bw) + err := a.Chain.ExportRange( + ctx, + bw, + headTs, + tailTs, + cfg.IncludeMessages, cfg.IncludeReceipts, cfg.IncludeStateRoots, + cfg.NumWorkers, cfg.CacheSize, + ) bw.Flush() //nolint:errcheck // it is a write to a pipe w.CloseWithError(err) //nolint:errcheck // it is a pipe }()