diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index 4b39401784a..38e901bf28d 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -117,7 +117,8 @@ type activeResources struct { gpuUsed bool cpuUse uint64 - cond *sync.Cond + cond *sync.Cond + waiting int } type workerRequest struct { diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 96a1fa8638d..7c16120c218 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -11,7 +11,9 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r if a.cond == nil { a.cond = sync.NewCond(locker) } + a.waiting++ a.cond.Wait() + a.waiting-- } a.add(wr.Resources, r) @@ -19,13 +21,15 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r err := cb() a.free(wr.Resources, r) - if a.cond != nil { - a.cond.Broadcast() - } return err } +// must be called with the same lock as the one passed to withResources +func (a *activeResources) hasWorkWaiting() bool { + return a.waiting > 0 +} + func (a *activeResources) add(wr storiface.WorkerResources, r Resources) { if r.CanGPU { a.gpuUsed = true @@ -42,6 +46,10 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) { a.cpuUse -= r.Threads(wr.CPUs) a.memUsedMin -= r.MinMemory a.memUsedMax -= r.MaxMemory + + if a.cond != nil { + a.cond.Broadcast() + } } // canHandleRequest evaluates if the worker has enough available resources to diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index cca2a56c545..ad74e01ee75 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/extern/sector-storage/stores" @@ -338,6 +339,11 @@ func (sw *schedWorker) workerCompactWindows() { } func (sw *schedWorker) processAssignedWindows() { + sw.assignReadyWork() + sw.assignPreparingWork() +} + +func (sw *schedWorker) assignPreparingWork() { worker := sw.worker assignLoop: @@ -366,7 +372,67 @@ assignLoop: todo := firstWindow.todo[tidx] log.Debugf("assign worker sector %d", todo.sector.ID.Number) - err := sw.startProcessingTask(sw.taskDone, todo) + err := sw.startProcessingTask(todo) + + if err != nil { + log.Errorf("startProcessingTask error: %+v", err) + go todo.respond(xerrors.Errorf("startProcessingTask error: %w", err)) + } + + // Note: we're not freeing window.allocated resources here very much on purpose + copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:]) + firstWindow.todo[len(firstWindow.todo)-1] = nil + firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1] + } + + copy(worker.activeWindows, worker.activeWindows[1:]) + worker.activeWindows[len(worker.activeWindows)-1] = nil + worker.activeWindows = worker.activeWindows[:len(worker.activeWindows)-1] + + sw.windowsRequested-- + } +} + +func (sw *schedWorker) assignReadyWork() { + worker := sw.worker + + worker.lk.Lock() + defer worker.lk.Unlock() + + if worker.active.hasWorkWaiting() { + // prepared tasks have priority + return + } + +assignLoop: + // process windows in order + for len(worker.activeWindows) > 0 { + firstWindow := worker.activeWindows[0] + + // process tasks within a window, preferring tasks at lower indexes + for len(firstWindow.todo) > 0 { + tidx := -1 + + for t, todo := range firstWindow.todo { + if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task + continue + } + + needRes := ResourceTable[todo.taskType][todo.sector.ProofType] + if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) { + tidx = t + break + } + } + + if tidx == -1 { + break assignLoop + } + + todo := firstWindow.todo[tidx] + + log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number) + err := sw.startProcessingReadyTask(todo) if err != nil { log.Errorf("startProcessingTask error: %+v", err) @@ -387,7 +453,7 @@ assignLoop: } } -func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRequest) error { +func (sw *schedWorker) startProcessingTask(req *workerRequest) error { w, sh := sw.worker, sw.sched needRes := ResourceTable[req.taskType][req.sector.ProofType] @@ -406,7 +472,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe w.lk.Unlock() select { - case taskDone <- struct{}{}: + case sw.taskDone <- struct{}{}: case <-sh.closing: log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) } @@ -428,7 +494,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe defer w.lk.Lock() // we MUST return locked from this function select { - case taskDone <- struct{}{}: + case sw.taskDone <- struct{}{}: case <-sh.closing: } @@ -457,6 +523,46 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe return nil } +func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error { + w, sh := sw.worker, sw.sched + + needRes := ResourceTable[req.taskType][req.sector.ProofType] + + w.active.add(w.info.Resources, needRes) + + go func() { + // Do the work! + err := req.work(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc)) + + select { + case req.ret <- workerResponse{err: err}: + case <-req.ctx.Done(): + log.Warnf("request got cancelled before we could respond") + case <-sh.closing: + log.Warnf("scheduler closed while sending response") + } + + w.lk.Lock() + + w.active.free(w.info.Resources, needRes) + + select { + case sw.taskDone <- struct{}{}: + case <-sh.closing: + log.Warnf("scheduler closed while sending response (prepare error: %+v)", err) + } + + w.lk.Unlock() + + // This error should always be nil, since nothing is setting it, but just to be safe: + if err != nil { + log.Errorf("error executing worker (ready): %+v", err) + } + }() + + return nil +} + func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) { select { case <-w.closingMgr: