diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 336664ca819..db5f9a589ba 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -289,14 +289,20 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.remoteHnd.ServeHTTP(w, r) } -func schedNop(context.Context, Worker) error { - return nil +var schedNop = PrepareAction{ + Action: func(ctx context.Context, w Worker) error { + return nil + }, + PrepType: sealtasks.TTNoop, } -func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error { - return func(ctx context.Context, worker Worker) error { - _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) - return err +func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction { + return PrepareAction{ + Action: func(ctx context.Context, worker Worker) error { + _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am)) + return err + }, + PrepType: sealtasks.TTFetch, } } @@ -315,16 +321,19 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto // if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and // put it in the sealing scratch space. - sealFetch := func(ctx context.Context, worker Worker) error { - log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) - _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)) - _, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy)) - - if err != nil && err2 != nil { - return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2) - } + sealFetch := PrepareAction{ + Action: func(ctx context.Context, worker Worker) error { + log.Debugf("copy sealed/cache sector data for sector %d", sector.ID) + _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)) + _, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy)) + + if err != nil && err2 != nil { + return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2) + } - return nil + return nil + }, + PrepType: sealtasks.TTFetch, } if unsealed == nil { diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 4f167eae325..c0ac11bcf9d 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -42,6 +42,10 @@ func WithPriority(ctx context.Context, priority int) context.Context { const mib = 1 << 20 type WorkerAction func(ctx context.Context, w Worker) error +type PrepareAction struct { + Action WorkerAction + PrepType sealtasks.TaskType +} type SchedWorker interface { TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) @@ -130,7 +134,7 @@ type WorkerRequest struct { Sel WorkerSelector SchedId uuid.UUID - prepare WorkerAction + prepare PrepareAction work WorkerAction start time.Time @@ -197,7 +201,7 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) { }, nil } -func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error { +func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error { ret := make(chan workerResponse) select { @@ -247,6 +251,13 @@ func (r *WorkerRequest) SealTask() sealtasks.SealTaskType { } } +func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType { + return sealtasks.SealTaskType{ + TaskType: r.prepare.PrepType, + RegisteredSealProof: r.Sector.ProofType, + } +} + type SchedDiagRequestInfo struct { Sector abi.SectorID TaskType sealtasks.TaskType diff --git a/storage/sealer/sched_test.go b/storage/sealer/sched_test.go index 5cf1a35b1b2..07731e934c6 100644 --- a/storage/sealer/sched_test.go +++ b/storage/sealer/sched_test.go @@ -288,25 +288,30 @@ func TestSched(t *testing.T) { ProofType: spt, } - err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error { - wi, err := w.Info(ctx) - require.NoError(t, err) + prep := PrepareAction{ + Action: func(ctx context.Context, w Worker) error { + wi, err := w.Info(ctx) + require.NoError(t, err) - require.Equal(t, expectWorker, wi.Hostname) + require.Equal(t, expectWorker, wi.Hostname) - log.Info("IN ", taskName) + log.Info("IN ", taskName) - for { - _, ok := <-done - if !ok { - break + for { + _, ok := <-done + if !ok { + break + } } - } - log.Info("OUT ", taskName) + log.Info("OUT ", taskName) - return nil - }, noopAction) + return nil + }, + PrepType: taskType, + } + + err := sched.Schedule(ctx, sectorRef, taskType, sel, prep, noopAction) if err != context.Canceled { require.NoError(t, err, fmt.Sprint(l, l2)) } diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go index 20918e774ac..b6efc851ac5 100644 --- a/storage/sealer/sched_worker.go +++ b/storage/sealer/sched_worker.go @@ -354,8 +354,8 @@ assignLoop: worker.lk.Lock() for t, todo := range firstWindow.Todo { - needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType) - if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) { + needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType) + if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) { tidx = t break } @@ -454,20 +454,21 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { w, sh := sw.worker, sw.sched needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType) + needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType) w.lk.Lock() - w.preparing.Add(req.SealTask(), w.Info.Resources, needRes) + w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() go func() { // first run the prepare step (e.g. fetching sector data from other worker) tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc) tw.start() - err := req.prepare(req.Ctx, tw) + err := req.prepare.Action(req.Ctx, tw) w.lk.Lock() if err != nil { - w.preparing.Free(req.SealTask(), w.Info.Resources, needRes) + w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() select { @@ -497,7 +498,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error { // wait (if needed) for resources in the 'active' window err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error { - w.preparing.Free(req.SealTask(), w.Info.Resources, needRes) + w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep) w.lk.Unlock() defer w.lk.Lock() // we MUST return locked from this function diff --git a/storage/sealer/sealtasks/task.go b/storage/sealer/sealtasks/task.go index bbf33b159ab..2e134f3d38d 100644 --- a/storage/sealer/sealtasks/task.go +++ b/storage/sealer/sealtasks/task.go @@ -36,6 +36,8 @@ const ( TTGenerateWindowPoSt TaskType = "post/v0/windowproof" TTGenerateWinningPoSt TaskType = "post/v0/winningproof" + + TTNoop TaskType = "" ) var order = map[TaskType]int{ diff --git a/storage/sealer/storiface/worker.go b/storage/sealer/storiface/worker.go index 3cbf9f737f4..2badad2923b 100644 --- a/storage/sealer/storiface/worker.go +++ b/storage/sealer/storiface/worker.go @@ -65,6 +65,20 @@ func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks return res } +// PrepResourceSpec is like ResourceSpec, but meant for use limiting parallel preparing +// tasks. +func (wr WorkerResources) PrepResourceSpec(spt abi.RegisteredSealProof, tt, prepTT sealtasks.TaskType) Resources { + res := wr.ResourceSpec(spt, tt) + + if prepTT != tt && prepTT != sealtasks.TTNoop { + prepRes := wr.ResourceSpec(spt, prepTT) + res.MaxConcurrent = prepRes.MaxConcurrent + } + + // otherwise, use the default resource table + return res +} + type WorkerStats struct { Info WorkerInfo Tasks []sealtasks.TaskType