Skip to content

Commit

Permalink
protect the job function's waitgroup with a mutex to avoid race condi…
Browse files Browse the repository at this point in the history
…tions (#512)
  • Loading branch information
JohnRoesler authored Jun 9, 2023
1 parent 272915a commit fed28e8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
10 changes: 8 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ func runJob(f jobFunction) {

func (jf *jobFunction) singletonRunner() {
jf.singletonRunnerOn.Store(true)
jf.singletonWgMu.Lock()
jf.singletonWg.Add(1)
jf.singletonWgMu.Unlock()
for {
select {
case <-jf.ctx.Done():
Expand Down Expand Up @@ -163,7 +165,7 @@ func (e *executor) runJob(f jobFunction) {
}
runJob(f)
case singletonMode:
e.singletonWgs.Store(f.singletonWg, struct{}{})
e.singletonWgs.Store(f.singletonWg, f.singletonWgMu)

if !f.singletonRunnerOn.Load() {
go f.singletonRunner()
Expand Down Expand Up @@ -243,8 +245,12 @@ func (e *executor) stop() {
e.wg.Wait()
if e.singletonWgs != nil {
e.singletonWgs.Range(func(key, value interface{}) bool {
if wg, ok := key.(*sync.WaitGroup); ok {
wg, wgOk := key.(*sync.WaitGroup)
mu, muOk := value.(*sync.Mutex)
if wgOk && muOk {
mu.Lock()
wg.Wait()
mu.Unlock()
}
return true
})
Expand Down
13 changes: 10 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type jobFunction struct {
runStartCount *atomic.Int64 // number of times the job was started
runFinishCount *atomic.Int64 // number of times the job was finished
singletonWg *sync.WaitGroup // used by singleton runner
singletonWgMu *sync.Mutex // use to protect the singletonWg
stopped *atomic.Bool // tracks whether the job is currently stopped
jobFuncNextRun time.Time // the next time the job is scheduled to run
}
Expand Down Expand Up @@ -88,6 +89,7 @@ func (jf *jobFunction) copy() jobFunction {
runStartCount: jf.runStartCount,
runFinishCount: jf.runFinishCount,
singletonWg: jf.singletonWg,
singletonWgMu: jf.singletonWgMu,
singletonRunnerOn: jf.singletonRunnerOn,
stopped: jf.stopped,
jobFuncNextRun: jf.jobFuncNextRun,
Expand Down Expand Up @@ -422,11 +424,16 @@ func (j *Job) SingletonMode() {
j.mu.Lock()
defer j.mu.Unlock()
j.runConfig.mode = singletonMode

j.jobFunction.singletonWgMu = &sync.Mutex{}
j.jobFunction.singletonWgMu.Lock()
j.jobFunction.singletonWg = &sync.WaitGroup{}
j.singletonQueueMu = &sync.Mutex{}
j.singletonQueueMu.Lock()
defer j.singletonQueueMu.Unlock()
j.jobFunction.singletonWgMu.Unlock()

j.jobFunction.singletonQueueMu = &sync.Mutex{}
j.jobFunction.singletonQueueMu.Lock()
j.jobFunction.singletonQueue = make(chan struct{}, 100)
j.jobFunction.singletonQueueMu.Unlock()
}

// shouldRun evaluates if this job should run again
Expand Down

0 comments on commit fed28e8

Please sign in to comment.