Skip to content

Commit

Permalink
sched: Share active/preparing task counters
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Feb 28, 2023
1 parent 745476c commit 2316363
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 22 deletions.
2 changes: 1 addition & 1 deletion storage/sealer/sched_assigner_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {

windows := make([]SchedWindow, windowsLen)
for i := range windows {
windows[i].Allocated = *NewActiveResources()
windows[i].Allocated = *NewActiveResources(newTaskCounter())
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex

Expand Down
72 changes: 59 additions & 13 deletions storage/sealer/sched_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,68 @@ type ActiveResources struct {
gpuUsed float64
cpuUse uint64

taskCounters map[sealtasks.SealTaskType]int
taskCounters *taskCounter

cond *sync.Cond
waiting int
}

func NewActiveResources() *ActiveResources {
return &ActiveResources{
type taskCounter struct {
taskCounters map[sealtasks.SealTaskType]int

// this lock is technically redundant, as ActiveResources is always accessed
// with the worker lock, but let's not panic if we ever change that
lk sync.Mutex
}

func newTaskCounter() *taskCounter {
return &taskCounter{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}

func (tc *taskCounter) Add(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]++
}

func (tc *taskCounter) Free(tt sealtasks.SealTaskType) {
tc.lk.Lock()
defer tc.lk.Unlock()
tc.taskCounters[tt]--
}

func (tc *taskCounter) Get(tt sealtasks.SealTaskType) int {
tc.lk.Lock()
defer tc.lk.Unlock()
return tc.taskCounters[tt]
}

func (tc *taskCounter) Sum() int {
tc.lk.Lock()
defer tc.lk.Unlock()
sum := 0
for _, v := range tc.taskCounters {
sum += v
}
return sum
}

func (tc *taskCounter) ForEach(cb func(tt sealtasks.SealTaskType, count int)) {
tc.lk.Lock()
defer tc.lk.Unlock()
for tt, count := range tc.taskCounters {
cb(tt, count)
}
}

func NewActiveResources(tc *taskCounter) *ActiveResources {
return &ActiveResources{
taskCounters: tc,
}
}

func (a *ActiveResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
Expand Down Expand Up @@ -59,7 +109,7 @@ func (a *ActiveResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
a.taskCounters[tt]++
a.taskCounters.Add(tt)

return a.utilization(wr) - startUtil
}
Expand All @@ -71,7 +121,7 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
a.taskCounters[tt]--
a.taskCounters.Free(tt)

if a.cond != nil {
a.cond.Broadcast()
Expand All @@ -82,8 +132,8 @@ func (a *ActiveResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
// handle the request.
func (a *ActiveResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
if a.taskCounters.Get(tt) >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters.Get(tt))
return false
}
}
Expand Down Expand Up @@ -173,14 +223,10 @@ func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { //
func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int {
// nil means all tasks
if tt == nil {
var count int
for _, c := range a.taskCounters {
count += c
}
return count
return a.taskCounters.Sum()
}

return a.taskCounters[*tt]
return a.taskCounters.Get(*tt)
}

func (wh *WorkerHandle) Utilization() float64 {
Expand Down
8 changes: 4 additions & 4 deletions storage/sealer/sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,8 +639,8 @@ func BenchmarkTrySched(b *testing.B) {
Resources: decentWorkerResources,
},
Enabled: true,
preparing: NewActiveResources(),
active: NewActiveResources(),
preparing: NewActiveResources(newTaskCounter()),
active: NewActiveResources(newTaskCounter()),
}

for i := 0; i < windows; i++ {
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestWindowCompact(t *testing.T) {

for _, windowTasks := range start {
window := &SchedWindow{
Allocated: *NewActiveResources(),
Allocated: *NewActiveResources(newTaskCounter()),
}

for _, task := range windowTasks {
Expand All @@ -708,7 +708,7 @@ func TestWindowCompact(t *testing.T) {
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)

for wi, tasks := range expect {
expectRes := NewActiveResources()
expectRes := NewActiveResources(newTaskCounter())

for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
Expand Down
6 changes: 4 additions & 2 deletions storage/sealer/sched_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
return nil, xerrors.Errorf("getting worker info: %w", err)
}

tc := newTaskCounter()

worker := &WorkerHandle{
workerRpc: w,
Info: info,

preparing: NewActiveResources(),
active: NewActiveResources(),
preparing: NewActiveResources(tc),
active: NewActiveResources(tc),
Enabled: true,

closingMgr: make(chan struct{}),
Expand Down
4 changes: 2 additions & 2 deletions storage/sealer/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
TaskCounts: map[string]int{},
}

for tt, count := range handle.active.taskCounters {
handle.active.taskCounters.ForEach(func(tt sealtasks.SealTaskType, count int) {
out[uuid.UUID(id)].TaskCounts[tt.String()] = count
}
})

handle.lk.Unlock()
}
Expand Down

0 comments on commit 2316363

Please sign in to comment.