diff --git a/worker/simple.go b/worker/simple.go index 3f5788d40..76d9ef673 100644 --- a/worker/simple.go +++ b/worker/simple.go @@ -15,6 +15,9 @@ var _ Worker = &Simple{} // NewSimple creates a basic implementation of the Worker interface // that is backed using just the standard library and goroutines. func NewSimple() *Simple { + // TODO(sio4): #road-to-v1 - how to check if the worker is ready to work + // when worker should be initialized? how to check if worker is ready? + // and purpose of the context return NewSimpleWithContext(context.Background()) } @@ -49,6 +52,10 @@ type Simple struct { // Register Handler with the worker func (w *Simple) Register(name string, h Handler) error { + if name == "" || h == nil { + return fmt.Errorf("name or handler cannot be empty/nil") + } + w.moot.Lock() defer w.moot.Unlock() if _, ok := w.handlers[name]; ok { @@ -60,6 +67,7 @@ func (w *Simple) Register(name string, h Handler) error { // Start the worker func (w *Simple) Start(ctx context.Context) error { + // TODO(sio4): #road-to-v1 - define the purpose of Start clearly w.Logger.Info("starting Simple background worker") w.ctx, w.cancel = context.WithCancel(ctx) @@ -68,20 +76,30 @@ func (w *Simple) Start(ctx context.Context) error { // Stop the worker func (w *Simple) Stop() error { + // prevent job submission when stopping + w.moot.Lock() + defer w.moot.Unlock() + w.Logger.Info("stopping Simple background worker") + w.cancel() + w.wg.Wait() w.Logger.Info("all background jobs stopped completely") - w.cancel() return nil } // Perform a job as soon as possibly using a goroutine. func (w *Simple) Perform(job Job) error { + // Perform should not allow a job submission if the worker is not running + if err := w.ctx.Err(); err != nil { + return fmt.Errorf("worker is not ready to perform a job: %v", err) + } + w.Logger.Debugf("performing job %s", job) if job.Handler == "" { - err := fmt.Errorf("no handler name given for %s", job) + err := fmt.Errorf("no handler name given: %s", job) w.Logger.Error(err) return err } @@ -89,7 +107,7 @@ func (w *Simple) Perform(job Job) error { w.moot.Lock() defer w.moot.Unlock() if h, ok := w.handlers[job.Handler]; ok { - // TODO: consider to implement timeout and/or cancellation + // TODO(sio4): #road-to-v1 - consider timeout and/or cancellation w.wg.Add(1) go func() { defer w.wg.Done() @@ -118,11 +136,20 @@ func (w *Simple) PerformAt(job Job, t time.Time) error { // PerformIn performs a job after waiting for a specified amount // using a goroutine. func (w *Simple) PerformIn(job Job, d time.Duration) error { + // Perform should not allow a job submission if the worker is not running + if err := w.ctx.Err(); err != nil { + return fmt.Errorf("worker is not ready to perform a job: %v", err) + } + + w.wg.Add(1) // waiting job also should be counted go func() { + defer w.wg.Done() + select { case <-time.After(d): w.Perform(job) case <-w.ctx.Done(): + // TODO(sio4): #road-to-v1 - it should be guaranteed to be performed w.cancel() } }() diff --git a/worker/simple_test.go b/worker/simple_test.go index 6372d60ca..7abf0b499 100644 --- a/worker/simple_test.go +++ b/worker/simple_test.go @@ -1,6 +1,7 @@ package worker import ( + "context" "sync" "testing" "time" @@ -8,76 +9,188 @@ import ( "github.com/stretchr/testify/require" ) +func sampleHandler(Args) error { + return nil +} + +func Test_Simple_RegisterEmpty(t *testing.T) { + r := require.New(t) + + w := NewSimple() + err := w.Register("", sampleHandler) + r.Error(err) +} + +func Test_Simple_RegisterNil(t *testing.T) { + r := require.New(t) + + w := NewSimple() + err := w.Register("sample", nil) + r.Error(err) +} + +func Test_Simple_RegisterEmptyNil(t *testing.T) { + r := require.New(t) + + w := NewSimple() + err := w.Register("", nil) + r.Error(err) +} + +func Test_Simple_RegisterExisting(t *testing.T) { + r := require.New(t) + + w := NewSimple() + err := w.Register("sample", sampleHandler) + r.NoError(err) + + err = w.Register("sample", sampleHandler) + r.Error(err) +} + +func Test_Simple_StartStop(t *testing.T) { + r := require.New(t) + + w := NewSimple() + ctx := context.Background() + err := w.Start(ctx) + r.NoError(err) + r.NotNil(w.ctx) + r.Nil(w.ctx.Err()) + + err = w.Stop() + r.NoError(err) + r.NotNil(w.ctx) + r.NotNil(w.ctx.Err()) +} + func Test_Simple_Perform(t *testing.T) { r := require.New(t) var hit bool - wg := &sync.WaitGroup{} - wg.Add(1) w := NewSimple() + r.NoError(w.Start(context.Background())) + w.Register("x", func(Args) error { hit = true - wg.Done() return nil }) w.Perform(Job{ Handler: "x", }) - wg.Wait() + + // the worker should guarantee the job is finished before the worker stopped + r.NoError(w.Stop()) r.True(hit) } -func Test_Simple_PerformAt(t *testing.T) { +func Test_Simple_PerformBroken(t *testing.T) { r := require.New(t) var hit bool - wg := &sync.WaitGroup{} - wg.Add(1) w := NewSimple() + r.NoError(w.Start(context.Background())) + w.Register("x", func(Args) error { hit = true - wg.Done() + + //Index out of bounds on purpose + println([]string{}[0]) + return nil }) - w.PerformAt(Job{ + w.Perform(Job{ Handler: "x", - }, time.Now().Add(5*time.Millisecond)) - wg.Wait() + }) + + r.NoError(w.Stop()) r.True(hit) } -func Test_Simple_PerformBroken(t *testing.T) { +func Test_Simple_PerformWithEmptyJob(t *testing.T) { + r := require.New(t) + + w := NewSimple() + r.NoError(w.Start(context.Background())) + defer w.Stop() + + err := w.Perform(Job{}) + r.Error(err) +} + +func Test_Simple_PerformWithUnknownJob(t *testing.T) { + r := require.New(t) + + w := NewSimple() + r.NoError(w.Start(context.Background())) + defer w.Stop() + + err := w.Perform(Job{Handler: "unknown"}) + r.Error(err) +} + +/* TODO(sio4): #road-to-v1 - define the purpose of Start clearly + consider to make Perform to work only when the worker is started. +func Test_Simple_PerformBeforeStart(t *testing.T) { + r := require.New(t) + + w := NewSimple() + r.NoError(w.Register("sample", sampleHandler)) + + err := w.Perform(Job{Handler: "sample"}) + r.Error(err) +} +*/ + +func Test_Simple_PerformAfterStop(t *testing.T) { + r := require.New(t) + + w := NewSimple() + r.NoError(w.Register("sample", sampleHandler)) + r.NoError(w.Start(context.Background())) + r.NoError(w.Stop()) + + err := w.Perform(Job{Handler: "sample"}) + r.Error(err) +} + +func Test_Simple_PerformAt(t *testing.T) { r := require.New(t) var hit bool + w := NewSimple() + r.NoError(w.Start(context.Background())) + wg := &sync.WaitGroup{} wg.Add(1) - w := NewSimple() w.Register("x", func(Args) error { hit = true wg.Done() - - //Index out of bounds on purpose - println([]string{}[0]) - return nil }) - - w.Perform(Job{ + w.PerformAt(Job{ Handler: "x", - }) + }, time.Now().Add(5*time.Millisecond)) + + // how long does the handler take for assignment? hmm, + time.Sleep(100 * time.Millisecond) wg.Wait() r.True(hit) + + r.NoError(w.Stop()) } func Test_Simple_PerformIn(t *testing.T) { r := require.New(t) var hit bool + w := NewSimple() + r.NoError(w.Start(context.Background())) + wg := &sync.WaitGroup{} wg.Add(1) - w := NewSimple() + w.Register("x", func(Args) error { hit = true wg.Done() @@ -86,14 +199,61 @@ func Test_Simple_PerformIn(t *testing.T) { w.PerformIn(Job{ Handler: "x", }, 5*time.Millisecond) + + // how long does the handler take for assignment? hmm, + time.Sleep(100 * time.Millisecond) wg.Wait() r.True(hit) + + r.NoError(w.Stop()) } -func Test_Simple_NoHandler(t *testing.T) { +/* TODO(sio4): #road-to-v1 - define the purpose of Start clearly + consider to make Perform to work only when the worker is started. +func Test_Simple_PerformInBeforeStart(t *testing.T) { r := require.New(t) w := NewSimple() - err := w.Perform(Job{}) + r.NoError(w.Register("sample", sampleHandler)) + + err := w.PerformIn(Job{Handler: "sample"}, 5*time.Millisecond) + r.Error(err) +} +*/ + +func Test_Simple_PerformInAfterStop(t *testing.T) { + r := require.New(t) + + w := NewSimple() + r.NoError(w.Register("sample", sampleHandler)) + r.NoError(w.Start(context.Background())) + r.NoError(w.Stop()) + + err := w.PerformIn(Job{Handler: "sample"}, 5*time.Millisecond) r.Error(err) } + +/* TODO(sio4): #road-to-v1 - it should be guaranteed to be performed + consider to make PerformIn to guarantee the job execution +func Test_Simple_PerformInFollowedByStop(t *testing.T) { + r := require.New(t) + + var hit bool + w := NewSimple() + r.NoError(w.Start(context.Background())) + + w.Register("x", func(Args) error { + hit = true + return nil + }) + err := w.PerformIn(Job{ + Handler: "x", + }, 5*time.Millisecond) + r.NoError(err) + + // stop the worker immediately after PerformIn + r.NoError(w.Stop()) + + r.True(hit) +} +*/