-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkerpool.go
139 lines (116 loc) · 2.83 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package conc
import (
"context"
"sync"
)
// WorkerPoolMetrics is called for different events in the orchestrator.
type WorkerPoolMetrics interface {
Incr(n uint)
Decr(n uint)
Restart()
}
// nilMetric is the default WorkerPoolMetrics if none other is set.
type nilMetric struct{}
func (n *nilMetric) Incr(x uint) {}
func (n *nilMetric) Decr(x uint) {}
func (n *nilMetric) Restart() {}
// WorkerPool keeps track of current running processes. It starts and stops them.
type WorkerPool struct {
metrics WorkerPoolMetrics
run Runner
rep Reporter
stopper chan struct{}
// number of processes currently running. Must be modified while taking
// actualNL.
actualN uint
actualNL *sync.Cond
wantedN uint
}
type WorkerPoolOpts func(*WorkerPool)
func WithMetrics(metrics WorkerPoolMetrics) WorkerPoolOpts {
return func(o *WorkerPool) {
o.metrics = metrics
}
}
// NewWorkerPool creates an WorkerPool. The orchestrator starts with
// WantedN set to zero. Call Stop(...) to properly clean up after usage.
func NewWorkerPool(r Runner, re Reporter, opts ...WorkerPoolOpts) *WorkerPool {
res := &WorkerPool{
&nilMetric{},
r,
re,
make(chan struct{}),
0,
sync.NewCond(&sync.Mutex{}),
0,
}
for _, o := range opts {
o(res)
}
return res
}
// ActualN returns the number of processes currently running.
func (o *WorkerPool) ActualN() uint {
o.actualNL.L.Lock()
defer o.actualNL.L.Unlock()
return o.actualN
}
// WantedN returns the number of processes we want running.
func (o *WorkerPool) WantedN() uint {
return uint(o.wantedN)
}
// Incr increases the number of running processes. To wait for them to have
// shut down, call SettleDown().
func (o *WorkerPool) Incr(n uint) {
o.wantedN += n
o.actualNL.L.Lock()
o.actualN += n
o.actualNL.L.Unlock()
o.actualNL.Broadcast()
var i uint
for i = 0; i < n; i++ {
go o.runProcess()
}
}
func (o *WorkerPool) runProcess() {
o.run.Start(o.stopper, o.rep)
o.actualNL.L.Lock()
o.actualN--
o.actualNL.L.Unlock()
o.actualNL.Broadcast()
}
// Decr reduces the number of running processes. They will be closed async.
// To wait for them to have shut down, call SettleDown().
func (o *WorkerPool) Decr(n uint) {
o.wantedN -= n
if o.wantedN < 1 {
// Can't have zero of negative number of processes.
o.wantedN = 0
}
go func() {
var i uint
for i = 0; i < n; i++ {
o.stopper <- struct{}{}
}
}()
}
// Settle waits for WantedN to be the same as ActualN.
func (o *WorkerPool) SettleDown(ctx context.Context) {
// Consider using https://github.com/JensRantil/go-sync. Would likely simplify code.
localCtx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-localCtx.Done()
if ctx.Err() != nil {
o.actualNL.Broadcast()
}
}()
o.actualNL.L.Lock()
defer o.actualNL.L.Unlock()
for o.actualN != o.wantedN {
if ctx.Err() != nil {
return
}
o.actualNL.Wait()
}
}