From 717bcc3c604bc04f296503025e2fa0a5a0cc5356 Mon Sep 17 00:00:00 2001 From: pmahindrakar-oss Date: Mon, 8 May 2023 12:13:00 -0700 Subject: [PATCH] Added start time for supporting restarts for fixed rate schedules (#476) * Added race skip check Signed-off-by: Prafulla Mahindrakar * lint Signed-off-by: Prafulla Mahindrakar * Fixed unit tests Signed-off-by: pmahindrakar-oss * Moved to integration test Signed-off-by: pmahindrakar-oss * refactored integration test Signed-off-by: pmahindrakar-oss * nit : rename to lastTime Signed-off-by: pmahindrakar-oss * nit : revert Signed-off-by: pmahindrakar-oss * lastTime -> lastExecTime Signed-off-by: pmahindrakar-oss * integration test tag Signed-off-by: pmahindrakar-oss --------- Signed-off-by: Prafulla Mahindrakar Signed-off-by: pmahindrakar-oss Signed-off-by: eduardo apolinario Co-authored-by: eduardo apolinario --- flyteadmin/go.mod | 4 +- flyteadmin/go.sum | 4 +- flyteadmin/scheduler/core/gocron_job.go | 8 +- flyteadmin/scheduler/core/gocron_scheduler.go | 18 ++-- .../scheduler/core/gocron_scheduler_test.go | 30 +++--- flyteadmin/scheduler/core/scheduler.go | 2 +- flyteadmin/tests/scheduler_test.go | 95 +++++++++++++++++++ 7 files changed, 133 insertions(+), 28 deletions(-) create mode 100644 flyteadmin/tests/scheduler_test.go diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 6d78ffdd1..00d8dc6ac 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -52,6 +52,7 @@ require ( google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46 google.golang.org/grpc v1.46.0 google.golang.org/protobuf v1.28.0 + gorm.io/driver/mysql v1.4.4 gorm.io/driver/postgres v1.4.5 gorm.io/driver/sqlite v1.1.1 gorm.io/gorm v1.24.1-0.20221019064659-5dd2bb482755 @@ -185,7 +186,6 @@ require ( gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gorm.io/driver/mysql v1.4.4 // indirect k8s.io/apiextensions-apiserver v0.24.1 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/yaml v1.3.0 // indirect @@ -208,4 +208,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect ) -replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 +replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 8e166ce64..9ba7e3c50 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -1457,8 +1457,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= diff --git a/flyteadmin/scheduler/core/gocron_job.go b/flyteadmin/scheduler/core/gocron_job.go index e40a5af32..373d56461 100644 --- a/flyteadmin/scheduler/core/gocron_job.go +++ b/flyteadmin/scheduler/core/gocron_job.go @@ -19,7 +19,7 @@ type GoCronJob struct { nameOfSchedule string schedule models.SchedulableEntity funcWithSchedule TimedFuncWithSchedule - lastTime *time.Time + lastExecTime *time.Time catchupFromTime *time.Time entryID cron.EntryID } @@ -34,8 +34,8 @@ func (g *GoCronJob) Run(t time.Time) { if err := g.funcWithSchedule(jobFuncCtxWithLabel, g.schedule, t); err != nil { logger.Errorf(jobFuncCtxWithLabel, "Got error while scheduling %v", err) } - // Update the lastTime only if new trigger time t is after lastTime. - if g.lastTime == nil || g.lastTime.Before(t) { - g.lastTime = &t + // Update the lastExecTime only if new trigger time t is after lastExecTime. + if g.lastExecTime == nil || g.lastExecTime.Before(t) { + g.lastExecTime = &t } } diff --git a/flyteadmin/scheduler/core/gocron_scheduler.go b/flyteadmin/scheduler/core/gocron_scheduler.go index a9d068b06..0d0f78830 100644 --- a/flyteadmin/scheduler/core/gocron_scheduler.go +++ b/flyteadmin/scheduler/core/gocron_scheduler.go @@ -100,8 +100,8 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap g.jobStore.Range(func(key, value interface{}) bool { job := value.(*GoCronJob) scheduleIdentifier := key.(string) - if job.lastTime != nil { - snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastTime) + if job.lastExecTime != nil { + snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastExecTime) } return true }) @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap // ScheduleJob allows to schedule a job using the implemented scheduler func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity, - funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error { + funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error { nameOfSchedule := identifier.GetScheduleName(ctx, schedule) @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched return nil } - // Update the catchupFrom time as the lastTime. - // Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup + // Update the catchupFrom time as the lastExecTime. + // Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup // Once initialized we wont be changing the catchupTime until the next boot job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule, - catchupFromTime: lastTime, ctx: ctx} + catchupFromTime: lastExecTime, lastExecTime: lastExecTime, ctx: ctx} // Define the timed job function to be used for the callback at the scheduled time //jobFunc := job.GetTimedFunc(ctx, g.metrics) @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo var jobFunc cron.TimedFuncJob jobFunc = job.Run - entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc) + var lastTime time.Time + if job.lastExecTime != nil { + lastTime = *job.lastExecTime + } + entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastTime) // Update the enttry id in the job which is handle to be used for removal job.entryID = entryID logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v", diff --git a/flyteadmin/scheduler/core/gocron_scheduler_test.go b/flyteadmin/scheduler/core/gocron_scheduler_test.go index ce4f7187c..3a936e061 100644 --- a/flyteadmin/scheduler/core/gocron_scheduler_test.go +++ b/flyteadmin/scheduler/core/gocron_scheduler_test.go @@ -1,3 +1,6 @@ +//go:build !race +// +build !race + package core import ( @@ -6,6 +9,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" + adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteadmin/pkg/runtime" "github.com/flyteorg/flyteadmin/scheduler/executor/mocks" @@ -13,10 +20,6 @@ import ( "github.com/flyteorg/flyteadmin/scheduler/snapshoter" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/promutils" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/time/rate" ) var scheduleCron models.SchedulableEntity @@ -26,9 +29,6 @@ var scheduleFixedDeactivated models.SchedulableEntity var scheduleNonExistentDeActivated models.SchedulableEntity func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler { - configuration := runtime.NewConfigurationProvider() - applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() - schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) var schedules []models.SchedulableEntity True := true False := false @@ -104,15 +104,21 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler { schedules = append(schedules, scheduleCronDeactivated) schedules = append(schedules, scheduleFixedDeactivated) schedules = append(schedules, scheduleNonExistentDeActivated) + return setupWithSchedules(t, subscope, schedules, useUtcTz) +} + +func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity, useUtcTz bool) *GoCronScheduler { + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) rateLimiter := rate.NewLimiter(1, 10) executor := new(mocks.Executor) - executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz) goCronScheduler, ok := g.(*GoCronScheduler) - assert.True(t, ok) goCronScheduler.UpdateSchedules(context.Background(), schedules) + assert.True(t, ok) goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot) goCronScheduler.CatchupAll(context.Background(), time.Now()) return goCronScheduler @@ -135,7 +141,7 @@ func TestUseUTCTz(t *testing.T) { func TestCalculateSnapshot(t *testing.T) { t.Run("empty snapshot", func(t *testing.T) { ctx := context.Background() - g := setup(t, "empty_snapshot", false) + g := setupWithSchedules(t, "empty_snapshot", nil, false) snapshot := g.CalculateSnapshot(ctx) assert.NotNil(t, snapshot) assert.True(t, snapshot.IsEmpty()) @@ -146,7 +152,7 @@ func TestCalculateSnapshot(t *testing.T) { g.jobStore.Range(func(key, value interface{}) bool { currTime := time.Now() job := value.(*GoCronJob) - job.lastTime = &currTime + job.lastExecTime = &currTime return true }) snapshot := g.CalculateSnapshot(ctx) diff --git a/flyteadmin/scheduler/core/scheduler.go b/flyteadmin/scheduler/core/scheduler.go index b0ede2550..0f670e147 100644 --- a/flyteadmin/scheduler/core/scheduler.go +++ b/flyteadmin/scheduler/core/scheduler.go @@ -14,7 +14,7 @@ type TimedFuncWithSchedule func(ctx context.Context, s models.SchedulableEntity, // calculating snapshot of the schedules , bootstrapping the scheduler from the snapshot as well as the catcup functionality type Scheduler interface { // ScheduleJob allows to schedule a job using the implemented scheduler - ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastT *time.Time) error + ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastExecTime *time.Time) error // DeScheduleJob allows to remove a scheduled job using the implemented scheduler DeScheduleJob(ctx context.Context, s models.SchedulableEntity) // BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions diff --git a/flyteadmin/tests/scheduler_test.go b/flyteadmin/tests/scheduler_test.go new file mode 100644 index 000000000..1e792966c --- /dev/null +++ b/flyteadmin/tests/scheduler_test.go @@ -0,0 +1,95 @@ +//go:build integration +// +build integration + +package tests + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" + + adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models" + "github.com/flyteorg/flyteadmin/pkg/runtime" + scheduler "github.com/flyteorg/flyteadmin/scheduler/core" + "github.com/flyteorg/flyteadmin/scheduler/executor/mocks" + "github.com/flyteorg/flyteadmin/scheduler/repositories/models" + "github.com/flyteorg/flyteadmin/scheduler/snapshoter" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flytestdlib/promutils" +) + +func TestScheduleJob(t *testing.T) { + ctx := context.Background() + True := true + now := time.Now() + scheduleFixed := models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: now, + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "fixed1", + Version: "version1", + }, + FixedRateValue: 1, + Unit: admin.FixedRateUnit_MINUTE, + Active: &True, + } + + c := cron.New() + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_job") + rateLimiter := rate.NewLimiter(1, 10) + executor := new(mocks.Executor) + snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) + g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false) + c.Start() + + tests := []struct { + testName string + lastExecTime *time.Time + assertionFunc func(t assert.TestingT, expected, actual interface{}, msgAndArgs ...interface{}) bool + }{ + {testName: "using_schedule_time", lastExecTime: &now, assertionFunc: assert.Equal}, + {testName: "without_schedule_time", lastExecTime: nil, assertionFunc: assert.NotEqual}, + } + wg := &sync.WaitGroup{} + for _, tc := range tests { + t.Run(tc.testName, func(t *testing.T) { + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + tc.assertionFunc(t, now, scheduleTime) + wg.Done() + return nil + } + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastExecTime) + assert.NoError(t, err) + }) + } + + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + c.Stop() + break + } +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +}