diff --git a/scheduler.go b/scheduler.go index 38a9c39..497a150 100644 --- a/scheduler.go +++ b/scheduler.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/jpillora/backoff" "gopkg.in/src-d/go-errors.v1" ) @@ -26,12 +27,18 @@ type jobScheduler struct { schedule JobScheduleFn cancel chan struct{} opts *WorkerPoolOpts + backoff *backoff.Backoff } const ( schedCapacity = 1000 - jobTimeout = 3 * time.Second - newJobTimeout = 30 * time.Second + schedTimeout = 5 * time.Second + + // backoff default configuration + backoffMinDuration = 250 * time.Millisecond + backoffMaxDuration = 1024 * time.Second + backoffFactor = 2 + backoffJitter = true ) func newJobScheduler( @@ -42,12 +49,8 @@ func newJobScheduler( opts.SchedulerCapacity = schedCapacity } - if opts.WaitJobTimeout <= 0 { - opts.WaitJobTimeout = jobTimeout - } - - if opts.WaitNewJobTimeout <= 0 { - opts.WaitNewJobTimeout = newJobTimeout + if opts.ScheduleJobTimeout <= 0 { + opts.ScheduleJobTimeout = schedTimeout } return &jobScheduler{ @@ -55,6 +58,12 @@ func newJobScheduler( schedule: schedule, cancel: make(chan struct{}), opts: opts, + backoff: &backoff.Backoff{ + Min: backoffMinDuration, + Max: backoffMaxDuration, + Factor: backoffFactor, + Jitter: backoffJitter, + }, } } @@ -63,6 +72,7 @@ func (s *jobScheduler) finish() { } func (s *jobScheduler) Schedule() { + s.backoff.Reset() for { select { case <-s.cancel: @@ -70,7 +80,7 @@ func (s *jobScheduler) Schedule() { default: ctx, cancel := context.WithTimeout( context.Background(), - s.opts.WaitJobTimeout, + s.opts.ScheduleJobTimeout, ) defer cancel() @@ -84,8 +94,7 @@ func (s *jobScheduler) Schedule() { select { case <-s.cancel: return - case <-time.After( - s.opts.WaitNewJobTimeout): + case <-time.After(s.backoff.Duration()): } } @@ -99,6 +108,7 @@ func (s *jobScheduler) Schedule() { select { case s.jobs <- job: + s.backoff.Reset() s.opts.Metrics.Discover(job) case <-s.cancel: return diff --git a/worker_pool.go b/worker_pool.go index 1903a2c..d2b27d6 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -7,11 +7,10 @@ import ( // WorkerPoolOpts are configuration options for a JobScheduler. type WorkerPoolOpts struct { - SchedulerCapacity int - WaitJobTimeout time.Duration - WaitNewJobTimeout time.Duration - NotWaitNewJobs bool - Metrics MetricsCollector + SchedulerCapacity int + ScheduleJobTimeout time.Duration + NotWaitNewJobs bool + Metrics MetricsCollector } // WorkerPool holds a pool of workers to process Jobs.