diff --git a/go.mod b/go.mod index 637fd3b3e..a6e514121 100644 --- a/go.mod +++ b/go.mod @@ -207,4 +207,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect ) -replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 +replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a diff --git a/go.sum b/go.sum index 1917aac0a..39f641caf 100644 --- a/go.sum +++ b/go.sum @@ -352,8 +352,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.1.13 h1:xRUOu9+6c/zTZRTv+He1s4kX7uxmd/K5y7tAP598f8A= -github.com/flyteorg/flyteidl v1.1.13/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U= github.com/flyteorg/flyteidl v1.1.15 h1:h+T8yeya5OEt7POav0wZkjPdtUatilraVTuwrioqzuA= github.com/flyteorg/flyteidl v1.1.15/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U= github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4= @@ -1459,8 +1457,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ= -github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4= +github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= diff --git a/scheduler/core/gocron_scheduler.go b/scheduler/core/gocron_scheduler.go index 15ca50f8c..701f0ce4a 100644 --- a/scheduler/core/gocron_scheduler.go +++ b/scheduler/core/gocron_scheduler.go @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap // ScheduleJob allows to schedule a job using the implemented scheduler func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity, - funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error { + funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error { nameOfSchedule := identifier.GetScheduleName(ctx, schedule) @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched return nil } - // Update the catchupFrom time as the lastTime. - // Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup + // Update the catchupFrom time as the lastExecTime. + // Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup // Once initialized we wont be changing the catchupTime until the next boot job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule, - catchupFromTime: lastTime, ctx: ctx} + catchupFromTime: lastExecTime, lastTime: lastExecTime, ctx: ctx} // Define the timed job function to be used for the callback at the scheduled time //jobFunc := job.GetTimedFunc(ctx, g.metrics) @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo var jobFunc cron.TimedFuncJob jobFunc = job.Run - entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc) + var lastExecTime time.Time + if job.lastTime != nil { + lastExecTime = *job.lastTime + } + entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastExecTime) // Update the enttry id in the job which is handle to be used for removal job.entryID = entryID logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v", diff --git a/scheduler/core/gocron_scheduler_test.go b/scheduler/core/gocron_scheduler_test.go index afc60be72..eaf750ba3 100644 --- a/scheduler/core/gocron_scheduler_test.go +++ b/scheduler/core/gocron_scheduler_test.go @@ -1,11 +1,20 @@ +//go:build !race +// +build !race + package core import ( "context" "fmt" + "sync" "testing" "time" + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/time/rate" + adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteadmin/pkg/runtime" "github.com/flyteorg/flyteadmin/scheduler/executor/mocks" @@ -13,10 +22,6 @@ import ( "github.com/flyteorg/flyteadmin/scheduler/snapshoter" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flytestdlib/promutils" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "golang.org/x/time/rate" ) var scheduleCron models.SchedulableEntity @@ -26,9 +31,6 @@ var scheduleFixedDeactivated models.SchedulableEntity var scheduleNonExistentDeActivated models.SchedulableEntity func setup(t *testing.T, subscope string) *GoCronScheduler { - configuration := runtime.NewConfigurationProvider() - applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() - schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) var schedules []models.SchedulableEntity True := true False := false @@ -104,15 +106,21 @@ func setup(t *testing.T, subscope string) *GoCronScheduler { schedules = append(schedules, scheduleCronDeactivated) schedules = append(schedules, scheduleFixedDeactivated) schedules = append(schedules, scheduleNonExistentDeActivated) + return setupWithSchedules(t, subscope, schedules) +} + +func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity) *GoCronScheduler { + configuration := runtime.NewConfigurationProvider() + applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig() + schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope) rateLimiter := rate.NewLimiter(1, 10) executor := new(mocks.Executor) - executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) - snapshot := &snapshoter.SnapshotV1{} + executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil) g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor) goCronScheduler, ok := g.(*GoCronScheduler) - assert.True(t, ok) goCronScheduler.UpdateSchedules(context.Background(), schedules) + assert.True(t, ok) goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot) goCronScheduler.CatchupAll(context.Background(), time.Now()) return goCronScheduler @@ -276,3 +284,78 @@ func TestCatchUpAllSchedule(t *testing.T) { catchupSuccess := g.CatchupAll(ctx, toTime) assert.True(t, catchupSuccess) } + +func TestScheduleJob(t *testing.T) { + ctx := context.Background() + True := true + lastTime := time.Now() + scheduleFixed = models.SchedulableEntity{ + BaseModel: adminModels.BaseModel{ + UpdatedAt: time.Now(), + }, + SchedulableEntityKey: models.SchedulableEntityKey{ + Project: "project", + Domain: "domain", + Name: "fixed1", + Version: "version1", + }, + FixedRateValue: 1, + Unit: admin.FixedRateUnit_MINUTE, + Active: &True, + } + t.Run("using schedule time", func(t *testing.T) { + var schedules []models.SchedulableEntity + schedules = append(schedules, scheduleFixed) + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.Equal(t, lastTime, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + g := GoCronScheduler{cron: c, jobStore: sync.Map{}} + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &lastTime) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + + t.Run("without schedule time", func(t *testing.T) { + var schedules []models.SchedulableEntity + schedules = append(schedules, scheduleFixed) + wg := &sync.WaitGroup{} + wg.Add(1) + timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error { + assert.NotEqual(t, lastTime, scheduleTime) + wg.Done() + return nil + } + c := cron.New() + g := GoCronScheduler{cron: c, jobStore: sync.Map{}} + err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil) + c.Start() + assert.NoError(t, err) + select { + case <-time.After(time.Minute * 2): + assert.Fail(t, "timed job didn't get triggered") + case <-wait(wg): + break + } + }) + +} + +func wait(wg *sync.WaitGroup) chan bool { + ch := make(chan bool) + go func() { + wg.Wait() + ch <- true + }() + return ch +}