diff --git a/executor.go b/executor.go index 5cdd2abb..8386b34e 100644 --- a/executor.go +++ b/executor.go @@ -86,7 +86,9 @@ func (jf *jobFunction) singletonRunner() { case <-jf.ctx.Done(): jf.singletonWg.Done() jf.singletonRunnerOn.Store(false) + jf.singletonQueueMu.Lock() jf.singletonQueue = make(chan struct{}, 1000) + jf.singletonQueueMu.Unlock() jf.stopped.Store(false) return case <-jf.singletonQueue: @@ -166,7 +168,9 @@ func (e *executor) runJob(f jobFunction) { if !f.singletonRunnerOn.Load() { go f.singletonRunner() } + f.singletonQueueMu.Lock() f.singletonQueue <- struct{}{} + f.singletonQueueMu.Unlock() } } diff --git a/job.go b/job.go index 30fb69e1..3c55b2ae 100644 --- a/job.go +++ b/job.go @@ -49,6 +49,7 @@ type jobFunction struct { jobName string // key of the distributed lock funcName string // the name of the function - e.g. main.func1 runConfig runConfig // configuration for how many times to run the job + singletonQueueMu *sync.Mutex // mutex for singletonQueue singletonQueue chan struct{} // queues jobs for the singleton runner to handle singletonRunnerOn *atomic.Bool // whether the runner function for singleton is running ctx context.Context // for cancellation @@ -80,6 +81,7 @@ func (jf *jobFunction) copy() jobFunction { jobName: jf.jobName, runConfig: jf.runConfig, singletonQueue: jf.singletonQueue, + singletonQueueMu: jf.singletonQueueMu, ctx: jf.ctx, cancel: jf.cancel, isRunning: jf.isRunning, @@ -421,6 +423,9 @@ func (j *Job) SingletonMode() { defer j.mu.Unlock() j.runConfig.mode = singletonMode j.jobFunction.singletonWg = &sync.WaitGroup{} + j.singletonQueueMu = &sync.Mutex{} + j.singletonQueueMu.Lock() + defer j.singletonQueueMu.Unlock() j.jobFunction.singletonQueue = make(chan struct{}, 100) }