Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Added start time for supporting restarts for fixed rate schedules #476

Merged
merged 10 commits into from
May 8, 2023
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84
replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.21 h1:09/xqYWFdUA22bVWKLjkSzhhSJfaJmDAraczpJ/Yiis=
github.com/flyteorg/flyteidl v1.1.21/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteidl v1.2.0 h1:snJPpc5a5Gr4GXYiAMX6Io1edT91ZxN/7oE6uhydrvk=
github.com/flyteorg/flyteidl v1.2.0/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
Expand Down Expand Up @@ -1461,8 +1459,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down
14 changes: 9 additions & 5 deletions scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap

// ScheduleJob allows to schedule a job using the implemented scheduler
func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity,
funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error {
funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error {

nameOfSchedule := identifier.GetScheduleName(ctx, schedule)

Expand All @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched
return nil
}

// Update the catchupFrom time as the lastTime.
// Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Update the catchupFrom time as the lastExecTime.
// Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Once initialized we wont be changing the catchupTime until the next boot
job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule,
catchupFromTime: lastTime, ctx: ctx}
catchupFromTime: lastExecTime, lastTime: lastExecTime, ctx: ctx}

// Define the timed job function to be used for the callback at the scheduled time
//jobFunc := job.GetTimedFunc(ctx, g.metrics)
Expand Down Expand Up @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
var lastExecTime time.Time
if job.lastTime != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this field name change to job.lastExecTime too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing this would mean maintaining backward compat. I will clean this up when i next make change to this file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, the JobStore will get loaded again after new version, so it will start using the new field after the restart, so we don't have to worry about backward compat.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, can you verify that there are indeed no problems on restart locally?

lastExecTime = *job.lastTime
}
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastExecTime)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
Expand Down
34 changes: 17 additions & 17 deletions scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build !race
// +build !race

package core

import (
Expand All @@ -6,17 +9,17 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

var scheduleCron models.SchedulableEntity
Expand All @@ -26,9 +29,6 @@ var scheduleFixedDeactivated models.SchedulableEntity
var scheduleNonExistentDeActivated models.SchedulableEntity

func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
var schedules []models.SchedulableEntity
True := true
False := false
Expand Down Expand Up @@ -104,15 +104,21 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
schedules = append(schedules, scheduleCronDeactivated)
schedules = append(schedules, scheduleFixedDeactivated)
schedules = append(schedules, scheduleNonExistentDeActivated)
return setupWithSchedules(t, subscope, schedules, useUtcTz)
}

func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz)
goCronScheduler, ok := g.(*GoCronScheduler)
assert.True(t, ok)
goCronScheduler.UpdateSchedules(context.Background(), schedules)
assert.True(t, ok)
goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot)
goCronScheduler.CatchupAll(context.Background(), time.Now())
return goCronScheduler
Expand All @@ -135,20 +141,14 @@ func TestUseUTCTz(t *testing.T) {
func TestCalculateSnapshot(t *testing.T) {
t.Run("empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "empty_snapshot", false)
g := setupWithSchedules(t, "empty_snapshot", nil, false)
snapshot := g.CalculateSnapshot(ctx)
assert.NotNil(t, snapshot)
assert.True(t, snapshot.IsEmpty())
})
t.Run("non empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "non_empty_snapshot", false)
g.jobStore.Range(func(key, value interface{}) bool {
currTime := time.Now()
job := value.(*GoCronJob)
job.lastTime = &currTime
return true
})
snapshot := g.CalculateSnapshot(ctx)
assert.NotNil(t, snapshot)
assert.False(t, snapshot.IsEmpty())
Expand Down
110 changes: 110 additions & 0 deletions tests/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
//go:build integration
// +build integration

package tests

import (
"context"
"sync"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
scheduler "github.com/flyteorg/flyteadmin/scheduler/core"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
)

func TestScheduleJob(t *testing.T) {
ctx := context.Background()
True := true
now := time.Now()
scheduleFixed := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: now,
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "fixed1",
Version: "version1",
},
FixedRateValue: 1,
Unit: admin.FixedRateUnit_MINUTE,
Active: &True,
}
t.Run("using schedule time", func(t *testing.T) {
wg := &sync.WaitGroup{}
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
assert.Equal(t, now, scheduleTime)
wg.Done()
return nil
}
c := cron.New()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time")
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false)
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &now)
c.Start()
assert.NoError(t, err)
select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
break
}
})

t.Run("without schedule time", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry if I'm being dense - can you point out how this test differs than the one above?

Copy link
Contributor Author

@pmahindrakar-oss pmahindrakar-oss Nov 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check L62 & L90,
The last Parameter to schedulerJob is different.
And also the assertion in timedFuncWithSchedule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the test to remove redundant code which makes this test clearer

wg := &sync.WaitGroup{}
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
assert.NotEqual(t, now, scheduleTime)
wg.Done()
return nil
}
c := cron.New()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_time")
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false)
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil)
c.Start()
assert.NoError(t, err)
select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
break
}
})

}

func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {
wg.Wait()
ch <- true
}()
return ch
}