Skip to content

Commit

Permalink
add new RegisterEventListeners using new EventListener type (#517)
Browse files Browse the repository at this point in the history
* add new RegisterEventListeners using new EventListener type

* add SetEventListeners to scheduler to set for all jobs

* Update job.go
  • Loading branch information
JohnRoesler authored Jun 21, 2023
1 parent 8db49d2 commit ffa4c91
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 15 deletions.
61 changes: 61 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,33 @@ func ExampleJob_PreviousRun() {
fmt.Println("Previous run:", job.PreviousRun())
}

func ExampleJob_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

job, _ := s.Every("1s").Name("my_func").Do(func() error { return fmt.Errorf("error") })
job.RegisterEventListeners(
gocron.AfterJobRuns(func(jobName string) {
fmt.Printf("afterJobRuns: %s\n", jobName)
}),
gocron.BeforeJobRuns(func(jobName string) {
fmt.Printf("beforeJobRuns: %s\n", jobName)
}),
gocron.WhenJobReturnsError(func(jobName string, err error) {
fmt.Printf("whenJobReturnsError: %s, %v\n", jobName, err)
}),
gocron.WhenJobReturnsNoError(func(jobName string) {
fmt.Printf("whenJobReturnsNoError: %s\n", jobName)
}),
)
s.StartAsync()
time.Sleep(100 * time.Millisecond)
s.Stop()
// Output:
// beforeJobRuns: my_func
// whenJobReturnsError: my_func, error
// afterJobRuns: my_func
}

func ExampleJob_RunCount() {
s := gocron.NewScheduler(time.UTC)
job, _ := s.Every(1).Second().Do(task)
Expand Down Expand Up @@ -596,6 +623,40 @@ func ExampleScheduler_NextRun() {
// 10:30
}

func ExampleScheduler_RegisterEventListeners() {
s := gocron.NewScheduler(time.UTC)

s.Every("1s").Name("my_func_1").Do(func() error { return fmt.Errorf("error_1") })
s.Every("1s").Name("my_func_2").
StartAt(time.Now().UTC().Add(50 * time.Millisecond)).
Do(func() error { return fmt.Errorf("error_2") })

s.RegisterEventListeners(
gocron.AfterJobRuns(func(jobName string) {
fmt.Printf("afterJobRuns: %s\n", jobName)
}),
gocron.BeforeJobRuns(func(jobName string) {
fmt.Printf("beforeJobRuns: %s\n", jobName)
}),
gocron.WhenJobReturnsError(func(jobName string, err error) {
fmt.Printf("whenJobReturnsError: %s, %v\n", jobName, err)
}),
gocron.WhenJobReturnsNoError(func(jobName string) {
fmt.Printf("whenJobReturnsNoError: %s\n", jobName)
}),
)
s.StartAsync()
time.Sleep(120 * time.Millisecond)
s.Stop()
// Output:
// beforeJobRuns: my_func_1
// whenJobReturnsError: my_func_1, error_1
// afterJobRuns: my_func_1
// beforeJobRuns: my_func_2
// whenJobReturnsError: my_func_2, error_2
// afterJobRuns: my_func_2
}

func ExampleScheduler_Remove() {
s := gocron.NewScheduler(time.UTC)
_, _ = s.Every(1).Week().Do(task)
Expand Down
9 changes: 8 additions & 1 deletion executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ func runJob(f jobFunction) {
f.runStartCount.Add(1)
f.isRunning.Store(true)
callJobFunc(f.eventListeners.onBeforeJobExecution)
callJobFuncWithParams(f.function, f.parameters)
_ = callJobFuncWithParams(f.eventListeners.beforeJobRuns, []interface{}{f.getName()})
err := callJobFuncWithParams(f.function, f.parameters)
if err != nil {
_ = callJobFuncWithParams(f.eventListeners.onError, []interface{}{f.getName(), err})
} else {
_ = callJobFuncWithParams(f.eventListeners.noError, []interface{}{f.getName()})
}
_ = callJobFuncWithParams(f.eventListeners.afterJobRuns, []interface{}{f.getName()})
callJobFunc(f.eventListeners.onAfterJobExecution)
f.isRunning.Store(false)
f.runFinishCount.Add(1)
Expand Down
27 changes: 22 additions & 5 deletions gocron.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,38 @@ const (
)

func callJobFunc(jobFunc interface{}) {
if jobFunc != nil {
reflect.ValueOf(jobFunc).Call([]reflect.Value{})
if jobFunc == nil {
return
}
f := reflect.ValueOf(jobFunc)
if !f.IsZero() {
f.Call([]reflect.Value{})
}
}

func callJobFuncWithParams(jobFunc interface{}, params []interface{}) {
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) error {
if jobFunc == nil {
return nil
}
f := reflect.ValueOf(jobFunc)
if f.IsZero() {
return nil
}
if len(params) != f.Type().NumIn() {
return
return nil
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
f.Call(in)
vals := f.Call(in)
for _, val := range vals {
i := val.Interface()
if err, ok := i.(error); ok {
return err
}
}
return nil
}

func getFunctionName(fn interface{}) string {
Expand Down
67 changes: 64 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ type jobFunction struct {
}

type eventListeners struct {
onBeforeJobExecution interface{} // performs before job executing
onAfterJobExecution interface{} // performs after job executing
onAfterJobExecution interface{} // deprecated
onBeforeJobExecution interface{} // deprecated
beforeJobRuns func(jobName string) // called before the job executes
afterJobRuns func(jobName string) // called after the job executes
onError func(jobName string, err error) // called when the job returns an error
noError func(jobName string) // called when no error is returned
}

type jobMutex struct {
Expand Down Expand Up @@ -98,6 +102,13 @@ func (jf *jobFunction) copy() jobFunction {
return cp
}

func (jf *jobFunction) getName() string {
if jf.jobName != "" {
return jf.jobName
}
return jf.funcName
}

type runConfig struct {
finiteRuns bool
maxRuns int
Expand Down Expand Up @@ -344,7 +355,57 @@ func (j *Job) Tags() []string {
return j.tags
}

// SetEventListeners accepts two functions that will be called, one before and one after the job is run
// EventListener functions utilize the job's name and are triggered
// by or in the condition that the name suggests
type EventListener func(j *Job)

// BeforeJobRuns is called before the job is run
func BeforeJobRuns(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.beforeJobRuns = eventListenerFunc
}
}

// AfterJobRuns is called after the job is run
// This is called even when an error is returned
func AfterJobRuns(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.afterJobRuns = eventListenerFunc
}
}

// WhenJobReturnsError is called when the job returns an error
func WhenJobReturnsError(eventListenerFunc func(jobName string, err error)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.onError = eventListenerFunc
}
}

// WhenJobReturnsNoError is called when the job does not return an error
// the function must accept a single parameter, which is an error
func WhenJobReturnsNoError(eventListenerFunc func(jobName string)) EventListener {
return func(j *Job) {
j.mu.Lock()
defer j.mu.Unlock()
j.eventListeners.noError = eventListenerFunc
}
}

// RegisterEventListeners accepts EventListeners and registers them for the job
// The event listeners are then called at the times described by each listener.
func (j *Job) RegisterEventListeners(eventListeners ...EventListener) {
for _, el := range eventListeners {
el(j)
}
}

// Deprecated: SetEventListeners accepts two functions that will be called, one before and one after the job is run
func (j *Job) SetEventListeners(onBeforeJobExecution interface{}, onAfterJobExecution interface{}) {
j.eventListeners = eventListeners{
onBeforeJobExecution: onBeforeJobExecution,
Expand Down
42 changes: 39 additions & 3 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gocron
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -234,23 +235,54 @@ func TestJob_CommonExports(t *testing.T) {
func TestJob_SetEventListeners(t *testing.T) {
t.Run("run event listeners callbacks for a job", func(t *testing.T) {
var (
jobRanPassed = false
beforeCallbackPassed = false
afterCallbackPassed = false
jobRanPassed bool
beforeCallbackPassed bool
afterCallbackPassed bool
beforeJobCallback bool
afterJobCallback bool
onErrorCallback bool
noErrorCallback bool
wg = &sync.WaitGroup{}
)
wg.Add(1)
s := NewScheduler(time.UTC)
job, err := s.Tag("tag1").Every("100ms").Do(func() {
jobRanPassed = true
})
require.NoError(t, err)
job.SetEventListeners(func() {
beforeCallbackPassed = true
}, func() {
defer wg.Done()
afterCallbackPassed = true
})

job2, err := s.Every("100ms").Do(func() error { return fmt.Errorf("failed") })
require.NoError(t, err)
wg.Add(1)
job2.RegisterEventListeners(
AfterJobRuns(func(_ string) {
afterJobCallback = true
wg.Done()
}),
BeforeJobRuns(func(_ string) {
beforeJobCallback = true
}),
WhenJobReturnsError(func(_ string, _ error) {
onErrorCallback = true
}),
)

job3, err := s.Every("100ms").Do(func() {})
require.NoError(t, err)
wg.Add(1)
job3.RegisterEventListeners(
WhenJobReturnsNoError(func(_ string) {
noErrorCallback = true
wg.Done()
}),
)

s.StartAsync()
wg.Wait()
s.Stop()
Expand All @@ -259,6 +291,10 @@ func TestJob_SetEventListeners(t *testing.T) {
assert.True(t, jobRanPassed)
assert.True(t, beforeCallbackPassed)
assert.True(t, afterCallbackPassed)
assert.True(t, beforeJobCallback)
assert.True(t, afterJobCallback)
assert.True(t, onErrorCallback)
assert.True(t, noErrorCallback)
})
}

Expand Down
14 changes: 11 additions & 3 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,6 @@ func (s *Scheduler) runContinuous(job *Job) {
if !job.getStartsImmediately() {
job.setStartsImmediately(true)
} else {
//if job.neverRan() {
// job.setLastRun(s.now())
//}
s.run(job)
}
nr := next.dateTime.Sub(s.now())
Expand Down Expand Up @@ -1433,3 +1430,14 @@ func (s *Scheduler) StopBlockingChan() {
func (s *Scheduler) WithDistributedLocker(l Locker) {
s.executor.distributedLocker = l
}

// RegisterEventListeners accepts EventListeners and registers them for all jobs
// in the scheduler at the time this function is called.
// The event listeners are then called at the times described by each listener.
// If a new job is added, an additional call to this method, or the job specific
// version must be executed in order for the new job to trigger event listeners.
func (s *Scheduler) RegisterEventListeners(eventListeners ...EventListener) {
for _, job := range s.Jobs() {
job.RegisterEventListeners(eventListeners...)
}
}

0 comments on commit ffa4c91

Please sign in to comment.