diff --git a/go.mod b/go.mod index 2906df4d9..384ef0d07 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/gorilla/sessions v1.2.1 github.com/monoculum/formam v3.5.5+incompatible github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef + github.com/robfig/cron v1.2.0 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 github.com/stretchr/testify v1.8.0 diff --git a/go.sum b/go.sum index 9ae535bb9..e99057055 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef h1:NKxTG6GVGbfMXc2mIk+KphcH6hagbVXhcFkbTgYleTI= github.com/psanford/memfs v0.0.0-20210214183328-a001468d78ef/go.mod h1:tcaRap0jS3eifrEEllL6ZMd9dg8IlDpi2S1oARrQ+NI= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/worker/simple.go b/worker/simple.go index 55227342e..1e0efceb8 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/robfig/cron" "github.com/sirupsen/logrus" ) @@ -34,6 +35,7 @@ func NewSimpleWithContext(ctx context.Context) *Simple { Logger: l, ctx: ctx, cancel: cancel, + cron: cron.New(), handlers: map[string]Handler{}, moot: &sync.Mutex{}, started: false, @@ -46,6 +48,7 @@ type Simple struct { Logger SimpleLogger ctx context.Context cancel context.CancelFunc + cron *cron.Cron handlers map[string]Handler moot *sync.Mutex wg sync.WaitGroup @@ -76,6 +79,8 @@ func (w *Simple) Start(ctx context.Context) error { defer w.moot.Unlock() w.ctx, w.cancel = context.WithCancel(ctx) + w.cron.Start() + w.started = true return nil } @@ -89,6 +94,7 @@ func (w *Simple) Stop() error { w.Logger.Info("stopping Simple background worker") w.cancel() + w.cron.Stop() w.wg.Wait() w.Logger.Info("all background jobs stopped completely") @@ -196,6 +202,30 @@ func (w *Simple) PerformIn(job Job, d time.Duration) error { return nil } +// RegisterPeriodic registers a job to be periodically executed accroding to the given cron specification +func (w *Simple) RegisterPeriodic(cronSpec, name string, h Handler) error { + if name == "" || h == nil { + return fmt.Errorf("name or handler cannot be empty/nil") + } + + w.moot.Lock() + defer w.moot.Unlock() + if _, ok := w.handlers[name]; ok { + return fmt.Errorf("handler already mapped for name %s", name) + } + w.handlers[name] = h + + w.cron.AddFunc(cronSpec, func() { + w.Perform(Job{ + Queue: "system_cron", + Handler: name, + Args: Args{}, + }) + }) + + return nil +} + // SimpleLogger is used by the Simple worker to write logs type SimpleLogger interface { Debugf(string, ...interface{}) diff --git a/worker/worker.go b/worker/worker.go index 36fc0262a..ee7cc369c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -24,6 +24,8 @@ type Worker interface { PerformIn(Job, time.Duration) error // Register a Handler Register(string, Handler) error + // RegisterPeriodic performs a job periodically according to the provided cron spec + RegisterPeriodic(cronSpec, jobName string, h Handler) error } /* TODO(sio4): #road-to-v1 - redefine Worker interface clearer