Skip to content

Commit

Permalink
worker sched: Separate resource def for preparing window
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Feb 28, 2023
1 parent 2316363 commit c484c38
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 36 deletions.
39 changes: 24 additions & 15 deletions storage/sealer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,20 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
m.remoteHnd.ServeHTTP(w, r)
}

func schedNop(context.Context, Worker) error {
return nil
var schedNop = PrepareAction{
Action: func(ctx context.Context, w Worker) error {
return nil
},
PrepType: sealtasks.TTNoop,
}

func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction {
return PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
return err
},
PrepType: sealtasks.TTFetch,
}
}

Expand All @@ -315,16 +321,19 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto

// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
// put it in the sealing scratch space.
sealFetch := func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))

if err != nil && err2 != nil {
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
}
sealFetch := PrepareAction{
Action: func(ctx context.Context, worker Worker) error {
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))

if err != nil && err2 != nil {
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
}

return nil
return nil
},
PrepType: sealtasks.TTFetch,
}

if unsealed == nil {
Expand Down
15 changes: 13 additions & 2 deletions storage/sealer/sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func WithPriority(ctx context.Context, priority int) context.Context {
const mib = 1 << 20

type WorkerAction func(ctx context.Context, w Worker) error
type PrepareAction struct {
Action WorkerAction
PrepType sealtasks.TaskType
}

type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
Expand Down Expand Up @@ -130,7 +134,7 @@ type WorkerRequest struct {
Sel WorkerSelector
SchedId uuid.UUID

prepare WorkerAction
prepare PrepareAction
work WorkerAction

start time.Time
Expand Down Expand Up @@ -197,7 +201,7 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
}, nil
}

func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error {
ret := make(chan workerResponse)

select {
Expand Down Expand Up @@ -247,6 +251,13 @@ func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
}
}

func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.prepare.PrepType,
RegisteredSealProof: r.Sector.ProofType,
}
}

type SchedDiagRequestInfo struct {
Sector abi.SectorID
TaskType sealtasks.TaskType
Expand Down
31 changes: 18 additions & 13 deletions storage/sealer/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,30 @@ func TestSched(t *testing.T) {
ProofType: spt,
}

err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
prep := PrepareAction{
Action: func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)

require.Equal(t, expectWorker, wi.Hostname)
require.Equal(t, expectWorker, wi.Hostname)

log.Info("IN ", taskName)
log.Info("IN ", taskName)

for {
_, ok := <-done
if !ok {
break
for {
_, ok := <-done
if !ok {
break
}
}
}

log.Info("OUT ", taskName)
log.Info("OUT ", taskName)

return nil
}, noopAction)
return nil
},
PrepType: taskType,
}

err := sched.Schedule(ctx, sectorRef, taskType, sel, prep, noopAction)
if err != context.Canceled {
require.NoError(t, err, fmt.Sprint(l, l2))
}
Expand Down
13 changes: 7 additions & 6 deletions storage/sealer/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ assignLoop:

worker.lk.Lock()
for t, todo := range firstWindow.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
Expand Down Expand Up @@ -454,20 +454,21 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w, sh := sw.worker, sw.sched

needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)

w.lk.Lock()
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()

go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
tw.start()
err := req.prepare(req.Ctx, tw)
err := req.prepare.Action(req.Ctx, tw)
w.lk.Lock()

if err != nil {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()

select {
Expand Down Expand Up @@ -497,7 +498,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {

// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function

Expand Down
2 changes: 2 additions & 0 deletions storage/sealer/sealtasks/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (

TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"

TTNoop TaskType = ""
)

var order = map[TaskType]int{
Expand Down
14 changes: 14 additions & 0 deletions storage/sealer/storiface/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks
return res
}

// PrepResourceSpec is like ResourceSpec, but meant for use limiting parallel preparing
// tasks.
func (wr WorkerResources) PrepResourceSpec(spt abi.RegisteredSealProof, tt, prepTT sealtasks.TaskType) Resources {
res := wr.ResourceSpec(spt, tt)

if prepTT != tt && prepTT != sealtasks.TTNoop {
prepRes := wr.ResourceSpec(spt, prepTT)
res.MaxConcurrent = prepRes.MaxConcurrent
}

// otherwise, use the default resource table
return res
}

type WorkerStats struct {
Info WorkerInfo
Tasks []sealtasks.TaskType
Expand Down

0 comments on commit c484c38

Please sign in to comment.