Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Added start time for supporting restarts for fixed rate schedules #476

Merged
merged 10 commits into from
May 8, 2023
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84
replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.21 h1:09/xqYWFdUA22bVWKLjkSzhhSJfaJmDAraczpJ/Yiis=
github.com/flyteorg/flyteidl v1.1.21/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteidl v1.2.0 h1:snJPpc5a5Gr4GXYiAMX6Io1edT91ZxN/7oE6uhydrvk=
github.com/flyteorg/flyteidl v1.2.0/go.mod h1:f0AFl7RFycH7+JLq2th0ReH7v+Xse+QTw4jGdIxiS8I=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
Expand Down Expand Up @@ -1461,8 +1459,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down
8 changes: 4 additions & 4 deletions scheduler/core/gocron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type GoCronJob struct {
nameOfSchedule string
schedule models.SchedulableEntity
funcWithSchedule TimedFuncWithSchedule
lastTime *time.Time
lastExecTime *time.Time
catchupFromTime *time.Time
entryID cron.EntryID
}
Expand All @@ -34,8 +34,8 @@ func (g *GoCronJob) Run(t time.Time) {
if err := g.funcWithSchedule(jobFuncCtxWithLabel, g.schedule, t); err != nil {
logger.Errorf(jobFuncCtxWithLabel, "Got error while scheduling %v", err)
}
// Update the lastTime only if new trigger time t is after lastTime.
if g.lastTime == nil || g.lastTime.Before(t) {
g.lastTime = &t
// Update the lastExecTime only if new trigger time t is after lastExecTime.
if g.lastExecTime == nil || g.lastExecTime.Before(t) {
g.lastExecTime = &t
}
}
18 changes: 11 additions & 7 deletions scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap
g.jobStore.Range(func(key, value interface{}) bool {
job := value.(*GoCronJob)
scheduleIdentifier := key.(string)
if job.lastTime != nil {
snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastTime)
if job.lastExecTime != nil {
snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastExecTime)
}
return true
})
Expand All @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap

// ScheduleJob allows to schedule a job using the implemented scheduler
func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity,
funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error {
funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error {

nameOfSchedule := identifier.GetScheduleName(ctx, schedule)

Expand All @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched
return nil
}

// Update the catchupFrom time as the lastTime.
// Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Update the catchupFrom time as the lastExecTime.
// Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Once initialized we wont be changing the catchupTime until the next boot
job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule,
catchupFromTime: lastTime, ctx: ctx}
catchupFromTime: lastExecTime, lastExecTime: lastExecTime, ctx: ctx}

// Define the timed job function to be used for the callback at the scheduled time
//jobFunc := job.GetTimedFunc(ctx, g.metrics)
Expand Down Expand Up @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
var lastTime time.Time
if job.lastExecTime != nil {
lastTime = *job.lastExecTime
}
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastTime)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
Expand Down
30 changes: 18 additions & 12 deletions scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build !race
// +build !race

package core

import (
Expand All @@ -6,17 +9,17 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

var scheduleCron models.SchedulableEntity
Expand All @@ -26,9 +29,6 @@ var scheduleFixedDeactivated models.SchedulableEntity
var scheduleNonExistentDeActivated models.SchedulableEntity

func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
var schedules []models.SchedulableEntity
True := true
False := false
Expand Down Expand Up @@ -104,15 +104,21 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
schedules = append(schedules, scheduleCronDeactivated)
schedules = append(schedules, scheduleFixedDeactivated)
schedules = append(schedules, scheduleNonExistentDeActivated)
return setupWithSchedules(t, subscope, schedules, useUtcTz)
}

func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz)
goCronScheduler, ok := g.(*GoCronScheduler)
assert.True(t, ok)
goCronScheduler.UpdateSchedules(context.Background(), schedules)
assert.True(t, ok)
goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot)
goCronScheduler.CatchupAll(context.Background(), time.Now())
return goCronScheduler
Expand All @@ -135,7 +141,7 @@ func TestUseUTCTz(t *testing.T) {
func TestCalculateSnapshot(t *testing.T) {
t.Run("empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "empty_snapshot", false)
g := setupWithSchedules(t, "empty_snapshot", nil, false)
snapshot := g.CalculateSnapshot(ctx)
assert.NotNil(t, snapshot)
assert.True(t, snapshot.IsEmpty())
Expand All @@ -146,7 +152,7 @@ func TestCalculateSnapshot(t *testing.T) {
g.jobStore.Range(func(key, value interface{}) bool {
currTime := time.Now()
job := value.(*GoCronJob)
job.lastTime = &currTime
job.lastExecTime = &currTime
return true
})
snapshot := g.CalculateSnapshot(ctx)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type TimedFuncWithSchedule func(ctx context.Context, s models.SchedulableEntity,
// calculating snapshot of the schedules , bootstrapping the scheduler from the snapshot as well as the catcup functionality
type Scheduler interface {
// ScheduleJob allows to schedule a job using the implemented scheduler
ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastT *time.Time) error
ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastExecTime *time.Time) error
// DeScheduleJob allows to remove a scheduled job using the implemented scheduler
DeScheduleJob(ctx context.Context, s models.SchedulableEntity)
// BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions
Expand Down
95 changes: 95 additions & 0 deletions tests/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//go:build integration
// +build integration

package tests

import (
"context"
"sync"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
scheduler "github.com/flyteorg/flyteadmin/scheduler/core"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
)

func TestScheduleJob(t *testing.T) {
ctx := context.Background()
True := true
now := time.Now()
scheduleFixed := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: now,
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "fixed1",
Version: "version1",
},
FixedRateValue: 1,
Unit: admin.FixedRateUnit_MINUTE,
Active: &True,
}

c := cron.New()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_job")
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false)
c.Start()

tests := []struct {
testName string
lastExecTime *time.Time
assertionFunc func(t assert.TestingT, expected, actual interface{}, msgAndArgs ...interface{}) bool
}{
{testName: "using_schedule_time", lastExecTime: &now, assertionFunc: assert.Equal},
{testName: "without_schedule_time", lastExecTime: nil, assertionFunc: assert.NotEqual},
}
wg := &sync.WaitGroup{}
for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
tc.assertionFunc(t, now, scheduleTime)
wg.Done()
return nil
}
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastExecTime)
assert.NoError(t, err)
})
}

select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
c.Stop()
break
}
}

func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {
wg.Wait()
ch <- true
}()
return ch
}