Skip to content

Commit

Permalink
fix: sched: Address GET_32G_MAX_CONCURRENT regression (#10850)
Browse files Browse the repository at this point in the history
* Fix 1.21 regression: GET_32G_MAX_CONCURRENT + mixed prepared/executing leads to stuck scheduler

If you have 12 GET tasks and GET_32G_MAX_CONCURRENT=1, sealing jobs will only show assigned tasks for GET of the miner
and is stuck.
I believe this to be a regression of 1.21 unifying the counters, in the case of GETs where PrepType and TaskType
both being seal/v0/fetch leading to a state where tasks are blocked since already counted towards the limit.

* itests: Repro issue from PR #10633

* make counters int (non-working)

* fix: worker sched: Send taskDone notifs after tasks are done

* itests: Make TestPledgeMaxConcurrentGet actually reproduce the issue

* make the linter happy

---------

Co-authored-by: Steffen Butzer <[email protected]>
  • Loading branch information
magik6k and steffengy authored May 10, 2023
1 parent 298b2b4 commit 6fd93ed
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 46 deletions.
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,12 @@ workflows:
- build
suite: itest-sdr_upgrade
target: "./itests/sdr_upgrade_test.go"
- test:
name: test-itest-sealing_resources
requires:
- build
suite: itest-sealing_resources
target: "./itests/sealing_resources_test.go"
- test:
name: test-itest-sector_finalize_early
requires:
Expand Down
64 changes: 64 additions & 0 deletions itests/sealing_resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package itests

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)

// Regression check for a fix introduced in https://github.com/filecoin-project/lotus/pull/10633
func TestPledgeMaxConcurrentGet(t *testing.T) {
require.NoError(t, os.Setenv("GET_2K_MAX_CONCURRENT", "1"))
t.Cleanup(func() {
require.NoError(t, os.Unsetenv("GET_2K_MAX_CONCURRENT"))
})

kit.QuietMiningLogs()

blockTime := 50 * time.Millisecond

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, miner, ens := kit.EnsembleMinimal(t, kit.NoStorage()) // no mock proofs
ens.InterconnectAll().BeginMiningMustPost(blockTime)

// separate sealed and storage paths so that finalize move needs to happen
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanSeal = true
})
miner.AddStorage(ctx, t, func(meta *storiface.LocalStorageMeta) {
meta.CanStore = true
})

// NOTE: This test only repros the issue when Fetch tasks take ~10s, there's
// no great way to do that in a non-horribly-hacky way

/* The horribly hacky way:
diff --git a/storage/sealer/sched_worker.go b/storage/sealer/sched_worker.go
index 35acd755d..76faec859 100644
--- a/storage/sealer/sched_worker.go
+++ b/storage/sealer/sched_worker.go
@@ -513,6 +513,10 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
tw.start()
err = <-werr
+ if req.TaskType == sealtasks.TTFetch {
+ time.Sleep(10 * time.Second)
+ }
+
select {
case req.ret <- workerResponse{err: err}:
case <-req.Ctx.Done():
*/

miner.PledgeSectors(ctx, 3, 0, nil)
}
2 changes: 1 addition & 1 deletion storage/sealer/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)

// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
continue
}

Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/sched_assigner_darts.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []

log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)

if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}

Expand Down Expand Up @@ -71,7 +71,7 @@ func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []
"worker", bestWid,
"choices", len(choices))

windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)

rmQueue = append(rmQueue, sqi)
Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/sched_assigner_spread.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [

log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)

if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}

Expand Down Expand Up @@ -71,7 +71,7 @@ func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [
"assigned", bestAssigned)

workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)

rmQueue = append(rmQueue, sqi)
Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/sched_assigner_spread_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind

log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)

if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWind
"assigned", bestAssigned)

workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)

rmQueue = append(rmQueue, sqi)
Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/sched_assigner_utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)

// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
if !windows[wnd].Allocated.CanHandleRequest(task.SchedId, task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}

Expand Down Expand Up @@ -82,7 +82,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
"worker", bestWid,
"utilization", bestUtilization)

workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SchedId, task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)

rmQueue = append(rmQueue, sqi)
Expand Down
5 changes: 3 additions & 2 deletions storage/sealer/sched_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

Expand Down Expand Up @@ -110,7 +111,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
for i, selected := range candidates {
worker := ps.workers[selected.id]

err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
err := worker.active.withResources(uuid.UUID{}, selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()

Expand Down Expand Up @@ -148,7 +149,7 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
continue
}

if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
if !wr.active.CanHandleRequest(uuid.UUID{}, ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}

Expand Down
61 changes: 39 additions & 22 deletions storage/sealer/sched_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sealer
import (
"sync"

"github.com/google/uuid"

"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
Expand All @@ -20,7 +22,7 @@ type ActiveResources struct {
}

type taskCounter struct {
taskCounters map[sealtasks.SealTaskType]int
taskCounters map[sealtasks.SealTaskType]map[uuid.UUID]int

// this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that
Expand All @@ -29,43 +31,57 @@ type taskCounter struct {

func newTaskCounter() *taskCounter {
return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{},
taskCounters: make(map[sealtasks.SealTaskType]map[uuid.UUID]int),
}
}

func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
func (tc *taskCounter) Add(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]++
tc.getUnlocked(tt)[schedID]++
}

func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
func (tc *taskCounter) Free(tt sealtasks.SealTaskType, schedID uuid.UUID) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]--
m := tc.getUnlocked(tt)
if m[schedID] <= 1 {
delete(m, schedID)
} else {
m[schedID]--
}
}

func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
func (tc *taskCounter) getUnlocked(tt sealtasks.SealTaskType) map[uuid.UUID]int {
if tc.taskCounters[tt] == nil {
tc.taskCounters[tt] = make(map[uuid.UUID]int)
}

return tc.taskCounters[tt]
}

func (tc *taskCounter) Get(tt sealtasks.SealTaskType) map[uuid.UUID]int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.taskCounters[tt]

return tc.getUnlocked(tt)
}

func (tc *taskCounter) Sum() int {
tc.lk.Lock()
defer tc.lk.Unlock()
sum := 0
for _, v := range tc.taskCounters {
sum += v
sum += len(v)
}
return sum
}

func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock()
defer tc.lk.Unlock()
for tt, count := range tc.taskCounters {
cb(tt, count)
for tt, v := range tc.taskCounters {
cb(tt, len(v))
}
}

Expand All @@ -75,8 +91,8 @@ func NewActiveResources(tc *taskCounter) *ActiveResources {
}
}

func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
func (a *ActiveResources) withResources(schedID uuid.UUID, id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(schedID, tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
Expand All @@ -85,11 +101,11 @@ func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting--
}

a.Add(tt, wr.Resources, r)
a.Add(schedID, tt, wr.Resources, r)

err := cb()

a.Free(tt, wr.Resources, r)
a.Free(schedID, tt, wr.Resources, r)

return err
}
Expand All @@ -100,7 +116,7 @@ func (a *ActiveResources) hasWorkWaiting() bool {
}

// add task resources to ActiveResources and return utilization difference
func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
func (a *ActiveResources) Add(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)

if r.GPUUtilization > 0 {
Expand All @@ -109,19 +125,19 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
a.taskCounters.Add(tt)
a.taskCounters.Add(tt, schedID)

return a.utilization(wr) - startUtil
}

func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
func (a *ActiveResources) Free(schedID uuid.UUID, tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters.Free(tt)
a.taskCounters.Free(tt, schedID)

if a.cond != nil {
a.cond.Broadcast()
Expand All @@ -130,9 +146,10 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes

// CanHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *ActiveResources) CanHandleRequest(schedID uuid.UUID, tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
tasks := a.taskCounters.Get(tt)
if len(tasks) >= needRes.MaxConcurrent && (schedID == uuid.UUID{} || tasks[schedID] == 0) {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false
}
Expand Down Expand Up @@ -226,7 +243,7 @@ func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
return a.taskCounters.Sum()
}

return a.taskCounters.Get(*tt)
return len(a.taskCounters.Get(*tt))
}

func (wh *WorkerHandle) Utilization() float64 {
Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func TestWindowCompact(t *testing.T) {
TaskType: task,
Sector: storiface.SectorRef{ProofType: spt},
})
window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
window.Allocated.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}

wh.activeWindows = append(wh.activeWindows, window)
Expand All @@ -717,7 +717,7 @@ func TestWindowCompact(t *testing.T) {

for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
expectRes.Add(uuid.UUID{}, task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}

require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi)
Expand Down
Loading

0 comments on commit 6fd93ed

Please sign in to comment.