From 9d9f40752954335142acf32bbc9cabc4bcd3f30d Mon Sep 17 00:00:00 2001 From: Prafulla Mahindrakar Date: Fri, 16 Sep 2022 19:05:15 +0530 Subject: [PATCH 1/9] Added race skip check Signed-off-by: Prafulla Mahindrakar --- go.mod | 2 +- go.sum | 6 +- scheduler/core/gocron_scheduler.go | 14 ++-- scheduler/core/gocron_scheduler_test.go | 100 ++++++++++++++++++++++-- 4 files changed, 105 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 4270a8a58..4859f2464 100644 --- a/go.mod +++ b/go.mod @@ -207,4 +207,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect ) -replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 +replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a diff --git a/go.sum b/go.sum index ea06e1e62..223f2bd88 100644 --- a/go.sum +++ b/go.sum @@ -352,8 +352,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.1.21 h1:09/xqYWFdUA22bVWKLjkSzhhSJfaJmDAraczpJ/Yiis= -github.com/flyteorg/flyteidl v1.1.21/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I= github.com/flyteorg/flyteidl v1.2.0 h1:snJPpc5a5Gr4GXYiAMX6Io1edT91ZxN/7oE6uhydrvk= github.com/flyteorg/flyteidl v1.2.0/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I= github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4= @@ -1461,8 +1459,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index a9d068b06..fbd47c499 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap // ScheduleJob allows to schedule a job using the implemented scheduler func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity, - funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error { + funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error { nameOfSchedule := identifier.GetScheduleName(ctx, schedule) @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched return nil } - // Update the catchupFrom time as the lastTime. - // Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup + // Update the catchupFrom time as the lastExecTime. + // Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup // Once initialized we wont be changing the catchupTime until the next boot job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule, - catchupFromTime: lastTime, ctx: ctx} + catchupFromTime: lastExecTime, lastTime: lastExecTime, ctx: ctx} // Define the timed job function to be used for the callback at the scheduled time //jobFunc := job.GetTimedFunc(ctx, g.metrics) @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo var jobFunc cron.TimedFuncJob jobFunc = job.Run - entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc) + var lastExecTime time.Time + if job.lastTime != nil { + lastExecTime = *job.lastTime + } + entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastExecTime) // Update the enttry id in the job which is handle to be used for removal job.entryID = entryID logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v", diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index ce4f7187c..bfc0b019d 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -1,11 +1,20 @@ +//go:build !race +// +build !race + package core import ( "context" "fmt" + "sync" "testing" "time" + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" + adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteadmin/pkg/runtime" "github.com/flyteorg/flyteadmin/scheduler/executor/mocks" @@ -13,10 +22,6 @@ import ( "github.com/flyteorg/flyteadmin/scheduler/snapshoter" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/promutils" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/time/rate" ) var scheduleCron models.SchedulableEntity @@ -104,15 +109,21 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler { schedules = append(schedules, scheduleCronDeactivated) schedules = append(schedules, scheduleFixedDeactivated) schedules = append(schedules, scheduleNonExistentDeActivated) + return setupWithSchedules(t, subscope, schedules) +} + +func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity) *GoCronScheduler { + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) rateLimiter := rate.NewLimiter(1, 10) executor := new(mocks.Executor) - executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz) goCronScheduler, ok := g.(*GoCronScheduler) - assert.True(t, ok) goCronScheduler.UpdateSchedules(context.Background(), schedules) + assert.True(t, ok) goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot) goCronScheduler.CatchupAll(context.Background(), time.Now()) return goCronScheduler @@ -290,3 +301,78 @@ func TestCatchUpAllSchedule(t *testing.T) { catchupSuccess := g.CatchupAll(ctx, toTime) assert.True(t, catchupSuccess) } + +func TestScheduleJob(t *testing.T) { + ctx := context.Background() + True := true + lastTime := time.Now() + scheduleFixed = models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: time.Now(), + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "fixed1", + Version: "version1", + }, + FixedRateValue: 1, + Unit: admin.FixedRateUnit_MINUTE, + Active: &True, + } + t.Run("using schedule time", func(t *testing.T) { + var schedules []models.SchedulableEntity + schedules = append(schedules, scheduleFixed) + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.Equal(t, lastTime, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + g := GoCronScheduler{cron: c, jobStore: sync.Map{}} + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &lastTime) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + + t.Run("without schedule time", func(t *testing.T) { + var schedules []models.SchedulableEntity + schedules = append(schedules, scheduleFixed) + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.NotEqual(t, lastTime, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + g := GoCronScheduler{cron: c, jobStore: sync.Map{}} + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +} From 8199f26483eb7ec66d3d8d3970d25b9c5a3cf85c Mon Sep 17 00:00:00 2001 From: Prafulla Mahindrakar Date: Fri, 16 Sep 2022 19:25:41 +0530 Subject: [PATCH 2/9] lint Signed-off-by: Prafulla Mahindrakar --- scheduler/core/gocron_scheduler_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index bfc0b019d..544351e6a 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -321,8 +321,6 @@ func TestScheduleJob(t *testing.T) { Active: &True, } t.Run("using schedule time", func(t *testing.T) { - var schedules []models.SchedulableEntity - schedules = append(schedules, scheduleFixed) wg := &sync.WaitGroup{} wg.Add(1) timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { @@ -344,8 +342,6 @@ func TestScheduleJob(t *testing.T) { }) t.Run("without schedule time", func(t *testing.T) { - var schedules []models.SchedulableEntity - schedules = append(schedules, scheduleFixed) wg := &sync.WaitGroup{} wg.Add(1) timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { From 3db1d5f7399f6d6c7ea33cf8bb6cc73c5578f559 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Mon, 26 Sep 2022 18:58:19 +0530 Subject: [PATCH 3/9] Fixed unit tests Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_scheduler_test.go | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index 544351e6a..f9eefde41 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -31,9 +31,6 @@ var scheduleFixedDeactivated models.SchedulableEntity var scheduleNonExistentDeActivated models.SchedulableEntity func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler { - configuration := runtime.NewConfigurationProvider() - applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() - schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) var schedules []models.SchedulableEntity True := true False := false @@ -109,10 +106,10 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler { schedules = append(schedules, scheduleCronDeactivated) schedules = append(schedules, scheduleFixedDeactivated) schedules = append(schedules, scheduleNonExistentDeActivated) - return setupWithSchedules(t, subscope, schedules) + return setupWithSchedules(t, subscope, schedules, useUtcTz) } -func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity) *GoCronScheduler { +func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity, useUtcTz bool) *GoCronScheduler { configuration := runtime.NewConfigurationProvider() applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) @@ -146,7 +143,7 @@ func TestUseUTCTz(t *testing.T) { func TestCalculateSnapshot(t *testing.T) { t.Run("empty snapshot", func(t *testing.T) { ctx := context.Background() - g := setup(t, "empty_snapshot", false) + g := setupWithSchedules(t, "empty_snapshot", nil, false) snapshot := g.CalculateSnapshot(ctx) assert.NotNil(t, snapshot) assert.True(t, snapshot.IsEmpty()) @@ -154,12 +151,6 @@ func TestCalculateSnapshot(t *testing.T) { t.Run("non empty snapshot", func(t *testing.T) { ctx := context.Background() g := setup(t, "non_empty_snapshot", false) - g.jobStore.Range(func(key, value interface{}) bool { - currTime := time.Now() - job := value.(*GoCronJob) - job.lastTime = &currTime - return true - }) snapshot := g.CalculateSnapshot(ctx) assert.NotNil(t, snapshot) assert.False(t, snapshot.IsEmpty()) From 62b7d74d989c5df03a4a3600e9d5e4a1ed0b3ac6 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Wed, 9 Nov 2022 12:26:41 +0530 Subject: [PATCH 4/9] Moved to integration test Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_scheduler_test.go | 73 ---------------- tests/scheduler_test.go | 110 ++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 73 deletions(-) create mode 100644 tests/scheduler_test.go diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index f9eefde41..40bfecd0b 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -6,11 +6,9 @@ package core import ( "context" "fmt" - "sync" "testing" "time" - "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "golang.org/x/time/rate" @@ -292,74 +290,3 @@ func TestCatchUpAllSchedule(t *testing.T) { catchupSuccess := g.CatchupAll(ctx, toTime) assert.True(t, catchupSuccess) } - -func TestScheduleJob(t *testing.T) { - ctx := context.Background() - True := true - lastTime := time.Now() - scheduleFixed = models.SchedulableEntity{ - BaseModel: adminModels.BaseModel{ - UpdatedAt: time.Now(), - }, - SchedulableEntityKey: models.SchedulableEntityKey{ - Project: "project", - Domain: "domain", - Name: "fixed1", - Version: "version1", - }, - FixedRateValue: 1, - Unit: admin.FixedRateUnit_MINUTE, - Active: &True, - } - t.Run("using schedule time", func(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { - assert.Equal(t, lastTime, scheduleTime) - wg.Done() - return nil - } - c := cron.New() - g := GoCronScheduler{cron: c, jobStore: sync.Map{}} - err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &lastTime) - c.Start() - assert.NoError(t, err) - select { - case <-time.After(time.Minute * 2): - assert.Fail(t, "timed job didn't get triggered") - case <-wait(wg): - break - } - }) - - t.Run("without schedule time", func(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { - assert.NotEqual(t, lastTime, scheduleTime) - wg.Done() - return nil - } - c := cron.New() - g := GoCronScheduler{cron: c, jobStore: sync.Map{}} - err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil) - c.Start() - assert.NoError(t, err) - select { - case <-time.After(time.Minute * 2): - assert.Fail(t, "timed job didn't get triggered") - case <-wait(wg): - break - } - }) - -} - -func wait(wg *sync.WaitGroup) chan bool { - ch := make(chan bool) - go func() { - wg.Wait() - ch <- true - }() - return ch -} diff --git a/tests/scheduler_test.go b/tests/scheduler_test.go new file mode 100644 index 000000000..151ab0717 --- /dev/null +++ b/tests/scheduler_test.go @@ -0,0 +1,110 @@ +//go:build integration +// +build integration + +package tests + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" + + adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models" + "github.com/flyteorg/flyteadmin/pkg/runtime" + scheduler "github.com/flyteorg/flyteadmin/scheduler/core" + "github.com/flyteorg/flyteadmin/scheduler/executor/mocks" + "github.com/flyteorg/flyteadmin/scheduler/repositories/models" + "github.com/flyteorg/flyteadmin/scheduler/snapshoter" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flytestdlib/promutils" +) + +func TestScheduleJob(t *testing.T) { + ctx := context.Background() + True := true + now := time.Now() + scheduleFixed := models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: now, + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "fixed1", + Version: "version1", + }, + FixedRateValue: 1, + Unit: admin.FixedRateUnit_MINUTE, + Active: &True, + } + t.Run("using schedule time", func(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.Equal(t, now, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time") + rateLimiter := rate.NewLimiter(1, 10) + executor := new(mocks.Executor) + snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &now) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + + t.Run("without schedule time", func(t *testing.T) { + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.NotEqual(t, now, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time") + rateLimiter := rate.NewLimiter(1, 10) + executor := new(mocks.Executor) + snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +} From 15830010d47f3ab9e286905fe5b859361c844bb9 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Thu, 10 Nov 2022 13:15:21 +0530 Subject: [PATCH 5/9] refactored integration test Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_scheduler.go | 6 +- tests/scheduler_test.go | 93 +++++++++++++----------------- 2 files changed, 42 insertions(+), 57 deletions(-) diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index fbd47c499..bed2842b2 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -271,11 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo var jobFunc cron.TimedFuncJob jobFunc = job.Run - var lastExecTime time.Time + var lastTime time.Time if job.lastTime != nil { - lastExecTime = *job.lastTime + lastTime = *job.lastTime } - entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastExecTime) + entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastTime) // Update the enttry id in the job which is handle to be used for removal job.entryID = entryID logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v", diff --git a/tests/scheduler_test.go b/tests/scheduler_test.go index 151ab0717..b9d441fed 100644 --- a/tests/scheduler_test.go +++ b/tests/scheduler_test.go @@ -42,62 +42,47 @@ func TestScheduleJob(t *testing.T) { Unit: admin.FixedRateUnit_MINUTE, Active: &True, } - t.Run("using schedule time", func(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { - assert.Equal(t, now, scheduleTime) - wg.Done() - return nil - } - c := cron.New() - configuration := runtime.NewConfigurationProvider() - applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() - schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time") - rateLimiter := rate.NewLimiter(1, 10) - executor := new(mocks.Executor) - snapshot := &snapshoter.SnapshotV1{} - executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) - err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &now) - c.Start() - assert.NoError(t, err) - select { - case <-time.After(time.Minute * 2): - assert.Fail(t, "timed job didn't get triggered") - case <-wait(wg): - break - } - }) - t.Run("without schedule time", func(t *testing.T) { - wg := &sync.WaitGroup{} - wg.Add(1) - timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { - assert.NotEqual(t, now, scheduleTime) - wg.Done() - return nil - } - c := cron.New() - configuration := runtime.NewConfigurationProvider() - applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() - schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time") - rateLimiter := rate.NewLimiter(1, 10) - executor := new(mocks.Executor) - snapshot := &snapshoter.SnapshotV1{} - executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) - err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil) - c.Start() - assert.NoError(t, err) - select { - case <-time.After(time.Minute * 2): - assert.Fail(t, "timed job didn't get triggered") - case <-wait(wg): - break - } - }) + c := cron.New() + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_job") + rateLimiter := rate.NewLimiter(1, 10) + executor := new(mocks.Executor) + snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) + c.Start() + tests := []struct { + testName string + lastT *time.Time + assertionFunc func(t assert.TestingT, expected, actual interface{}, msgAndArgs ...interface{}) bool + }{ + {testName: "using_schedule_time", lastT: &now, assertionFunc: assert.Equal}, + {testName: "without_schedule_time", lastT: nil, assertionFunc: assert.NotEqual}, + } + wg := &sync.WaitGroup{} + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + tc.assertionFunc(t, now, scheduleTime) + wg.Done() + return nil + } + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastT) + assert.NoError(t, err) + }) + } + + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + c.Stop() + break + } } func wait(wg *sync.WaitGroup) chan bool { From 045085f10284fcd9ea12bdfcc0b18ae7f6f181f6 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Thu, 10 Nov 2022 13:47:49 +0530 Subject: [PATCH 6/9] nit : rename to lastTime Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_scheduler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index bed2842b2..f30853eb6 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap // ScheduleJob allows to schedule a job using the implemented scheduler func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity, - funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error { + funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error { nameOfSchedule := identifier.GetScheduleName(ctx, schedule) @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched return nil } - // Update the catchupFrom time as the lastExecTime. - // Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup + // Update the catchupFrom time as the lastTime. + // Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup // Once initialized we wont be changing the catchupTime until the next boot job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule, - catchupFromTime: lastExecTime, lastTime: lastExecTime, ctx: ctx} + catchupFromTime: lastTime, lastTime: lastTime, ctx: ctx} // Define the timed job function to be used for the callback at the scheduled time //jobFunc := job.GetTimedFunc(ctx, g.metrics) From 57ef17930279e9ed23ab0d94617daa30f24dd4d5 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Thu, 10 Nov 2022 14:08:23 +0530 Subject: [PATCH 7/9] nit : revert Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_scheduler_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index 40bfecd0b..2a590a9dc 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -149,6 +149,12 @@ func TestCalculateSnapshot(t *testing.T) { t.Run("non empty snapshot", func(t *testing.T) { ctx := context.Background() g := setup(t, "non_empty_snapshot", false) + g.jobStore.Range(func(key, value interface{}) bool { + currTime := time.Now() + job := value.(*GoCronJob) + job.lastTime = &currTime + return true + }) snapshot := g.CalculateSnapshot(ctx) assert.NotNil(t, snapshot) assert.False(t, snapshot.IsEmpty()) From a1b7913a7f8b9199a0ff9135972a756340e7c5d5 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Thu, 10 Nov 2022 14:18:51 +0530 Subject: [PATCH 8/9] lastTime -> lastExecTime Signed-off-by: pmahindrakar-oss --- scheduler/core/gocron_job.go | 8 ++++---- scheduler/core/gocron_scheduler.go | 16 ++++++++-------- scheduler/core/gocron_scheduler_test.go | 2 +- scheduler/core/scheduler.go | 2 +- tests/scheduler_test.go | 11 ++++------- 5 files changed, 18 insertions(+), 21 deletions(-) diff --git a/scheduler/core/gocron_job.go b/scheduler/core/gocron_job.go index e40a5af32..373d56461 100644 --- a/scheduler/core/gocron_job.go +++ b/scheduler/core/gocron_job.go @@ -19,7 +19,7 @@ type GoCronJob struct { nameOfSchedule string schedule models.SchedulableEntity funcWithSchedule TimedFuncWithSchedule - lastTime *time.Time + lastExecTime *time.Time catchupFromTime *time.Time entryID cron.EntryID } @@ -34,8 +34,8 @@ func (g *GoCronJob) Run(t time.Time) { if err := g.funcWithSchedule(jobFuncCtxWithLabel, g.schedule, t); err != nil { logger.Errorf(jobFuncCtxWithLabel, "Got error while scheduling %v", err) } - // Update the lastTime only if new trigger time t is after lastTime. - if g.lastTime == nil || g.lastTime.Before(t) { - g.lastTime = &t + // Update the lastExecTime only if new trigger time t is after lastExecTime. + if g.lastExecTime == nil || g.lastExecTime.Before(t) { + g.lastExecTime = &t } } diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index f30853eb6..0d0f78830 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -100,8 +100,8 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap g.jobStore.Range(func(key, value interface{}) bool { job := value.(*GoCronJob) scheduleIdentifier := key.(string) - if job.lastTime != nil { - snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastTime) + if job.lastExecTime != nil { + snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastExecTime) } return true }) @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap // ScheduleJob allows to schedule a job using the implemented scheduler func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity, - funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error { + funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error { nameOfSchedule := identifier.GetScheduleName(ctx, schedule) @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched return nil } - // Update the catchupFrom time as the lastTime. - // Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup + // Update the catchupFrom time as the lastExecTime. + // Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup // Once initialized we wont be changing the catchupTime until the next boot job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule, - catchupFromTime: lastTime, lastTime: lastTime, ctx: ctx} + catchupFromTime: lastExecTime, lastExecTime: lastExecTime, ctx: ctx} // Define the timed job function to be used for the callback at the scheduled time //jobFunc := job.GetTimedFunc(ctx, g.metrics) @@ -272,8 +272,8 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo jobFunc = job.Run var lastTime time.Time - if job.lastTime != nil { - lastTime = *job.lastTime + if job.lastExecTime != nil { + lastTime = *job.lastExecTime } entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastTime) // Update the enttry id in the job which is handle to be used for removal diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index 2a590a9dc..3a936e061 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -152,7 +152,7 @@ func TestCalculateSnapshot(t *testing.T) { g.jobStore.Range(func(key, value interface{}) bool { currTime := time.Now() job := value.(*GoCronJob) - job.lastTime = &currTime + job.lastExecTime = &currTime return true }) snapshot := g.CalculateSnapshot(ctx) diff --git a/scheduler/core/scheduler.go b/scheduler/core/scheduler.go index b0ede2550..0f670e147 100644 --- a/scheduler/core/scheduler.go +++ b/scheduler/core/scheduler.go @@ -14,7 +14,7 @@ type TimedFuncWithSchedule func(ctx context.Context, s models.SchedulableEntity, // calculating snapshot of the schedules , bootstrapping the scheduler from the snapshot as well as the catcup functionality type Scheduler interface { // ScheduleJob allows to schedule a job using the implemented scheduler - ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastT *time.Time) error + ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastExecTime *time.Time) error // DeScheduleJob allows to remove a scheduled job using the implemented scheduler DeScheduleJob(ctx context.Context, s models.SchedulableEntity) // BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions diff --git a/tests/scheduler_test.go b/tests/scheduler_test.go index b9d441fed..61cad82ae 100644 --- a/tests/scheduler_test.go +++ b/tests/scheduler_test.go @@ -1,6 +1,3 @@ -//go:build integration -// +build integration - package tests import ( @@ -56,11 +53,11 @@ func TestScheduleJob(t *testing.T) { tests := []struct { testName string - lastT *time.Time + lastExecTime *time.Time assertionFunc func(t assert.TestingT, expected, actual interface{}, msgAndArgs ...interface{}) bool }{ - {testName: "using_schedule_time", lastT: &now, assertionFunc: assert.Equal}, - {testName: "without_schedule_time", lastT: nil, assertionFunc: assert.NotEqual}, + {testName: "using_schedule_time", lastExecTime: &now, assertionFunc: assert.Equal}, + {testName: "without_schedule_time", lastExecTime: nil, assertionFunc: assert.NotEqual}, } wg := &sync.WaitGroup{} for _, tc := range tests { @@ -71,7 +68,7 @@ func TestScheduleJob(t *testing.T) { wg.Done() return nil } - err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastT) + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastExecTime) assert.NoError(t, err) }) } From b06039bdb56e0047c532a3f62bd1c43c776b8b92 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Thu, 10 Nov 2022 15:17:29 +0530 Subject: [PATCH 9/9] integration test tag Signed-off-by: pmahindrakar-oss --- tests/scheduler_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/scheduler_test.go b/tests/scheduler_test.go index 61cad82ae..1e792966c 100644 --- a/tests/scheduler_test.go +++ b/tests/scheduler_test.go @@ -1,3 +1,6 @@ +//go:build integration +// +build integration + package tests import (