Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Added start time for supporting restarts for fixed rate schedules #476

Merged
merged 10 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
gorm.io/driver/mysql v1.4.4
gorm.io/driver/postgres v1.4.5
gorm.io/driver/sqlite v1.1.1
gorm.io/gorm v1.24.1-0.20221019064659-5dd2bb482755
Expand Down Expand Up @@ -185,7 +186,6 @@ require (
gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/mysql v1.4.4 // indirect
k8s.io/apiextensions-apiserver v0.24.1 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand All @@ -208,4 +208,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84
replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
Expand Down
8 changes: 4 additions & 4 deletions scheduler/core/gocron_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type GoCronJob struct {
nameOfSchedule string
schedule models.SchedulableEntity
funcWithSchedule TimedFuncWithSchedule
lastTime *time.Time
lastExecTime *time.Time
catchupFromTime *time.Time
entryID cron.EntryID
}
Expand All @@ -34,8 +34,8 @@ func (g *GoCronJob) Run(t time.Time) {
if err := g.funcWithSchedule(jobFuncCtxWithLabel, g.schedule, t); err != nil {
logger.Errorf(jobFuncCtxWithLabel, "Got error while scheduling %v", err)
}
// Update the lastTime only if new trigger time t is after lastTime.
if g.lastTime == nil || g.lastTime.Before(t) {
g.lastTime = &t
// Update the lastExecTime only if new trigger time t is after lastExecTime.
if g.lastExecTime == nil || g.lastExecTime.Before(t) {
g.lastExecTime = &t
}
}
18 changes: 11 additions & 7 deletions scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap
g.jobStore.Range(func(key, value interface{}) bool {
job := value.(*GoCronJob)
scheduleIdentifier := key.(string)
if job.lastTime != nil {
snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastTime)
if job.lastExecTime != nil {
snapshot.UpdateLastExecutionTime(scheduleIdentifier, job.lastExecTime)
}
return true
})
Expand All @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap

// ScheduleJob allows to schedule a job using the implemented scheduler
func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity,
funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error {
funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error {

nameOfSchedule := identifier.GetScheduleName(ctx, schedule)

Expand All @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched
return nil
}

// Update the catchupFrom time as the lastTime.
// Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Update the catchupFrom time as the lastExecTime.
// Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Once initialized we wont be changing the catchupTime until the next boot
job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule,
catchupFromTime: lastTime, ctx: ctx}
catchupFromTime: lastExecTime, lastExecTime: lastExecTime, ctx: ctx}

// Define the timed job function to be used for the callback at the scheduled time
//jobFunc := job.GetTimedFunc(ctx, g.metrics)
Expand Down Expand Up @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
var lastTime time.Time
if job.lastExecTime != nil {
lastTime = *job.lastExecTime
}
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastTime)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
Expand Down
30 changes: 18 additions & 12 deletions scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//go:build !race
// +build !race

package core

import (
Expand All @@ -6,17 +9,17 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

var scheduleCron models.SchedulableEntity
Expand All @@ -26,9 +29,6 @@ var scheduleFixedDeactivated models.SchedulableEntity
var scheduleNonExistentDeActivated models.SchedulableEntity

func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
var schedules []models.SchedulableEntity
True := true
False := false
Expand Down Expand Up @@ -104,15 +104,21 @@ func setup(t *testing.T, subscope string, useUtcTz bool) *GoCronScheduler {
schedules = append(schedules, scheduleCronDeactivated)
schedules = append(schedules, scheduleFixedDeactivated)
schedules = append(schedules, scheduleNonExistentDeActivated)
return setupWithSchedules(t, subscope, schedules, useUtcTz)
}

func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity, useUtcTz bool) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor, useUtcTz)
goCronScheduler, ok := g.(*GoCronScheduler)
assert.True(t, ok)
goCronScheduler.UpdateSchedules(context.Background(), schedules)
assert.True(t, ok)
goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot)
goCronScheduler.CatchupAll(context.Background(), time.Now())
return goCronScheduler
Expand All @@ -135,7 +141,7 @@ func TestUseUTCTz(t *testing.T) {
func TestCalculateSnapshot(t *testing.T) {
t.Run("empty snapshot", func(t *testing.T) {
ctx := context.Background()
g := setup(t, "empty_snapshot", false)
g := setupWithSchedules(t, "empty_snapshot", nil, false)
snapshot := g.CalculateSnapshot(ctx)
assert.NotNil(t, snapshot)
assert.True(t, snapshot.IsEmpty())
Expand All @@ -146,7 +152,7 @@ func TestCalculateSnapshot(t *testing.T) {
g.jobStore.Range(func(key, value interface{}) bool {
currTime := time.Now()
job := value.(*GoCronJob)
job.lastTime = &currTime
job.lastExecTime = &currTime
return true
})
snapshot := g.CalculateSnapshot(ctx)
Expand Down
2 changes: 1 addition & 1 deletion scheduler/core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type TimedFuncWithSchedule func(ctx context.Context, s models.SchedulableEntity,
// calculating snapshot of the schedules , bootstrapping the scheduler from the snapshot as well as the catcup functionality
type Scheduler interface {
// ScheduleJob allows to schedule a job using the implemented scheduler
ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastT *time.Time) error
ScheduleJob(ctx context.Context, s models.SchedulableEntity, f TimedFuncWithSchedule, lastExecTime *time.Time) error
// DeScheduleJob allows to remove a scheduled job using the implemented scheduler
DeScheduleJob(ctx context.Context, s models.SchedulableEntity)
// BootStrapSchedulesFromSnapShot allows to initialize the scheduler from a previous snapshot of the schedule executions
Expand Down
95 changes: 95 additions & 0 deletions tests/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//go:build integration
// +build integration

package tests

import (
"context"
"sync"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
scheduler "github.com/flyteorg/flyteadmin/scheduler/core"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
)

func TestScheduleJob(t *testing.T) {
ctx := context.Background()
True := true
now := time.Now()
scheduleFixed := models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: now,
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "fixed1",
Version: "version1",
},
FixedRateValue: 1,
Unit: admin.FixedRateUnit_MINUTE,
Active: &True,
}

c := cron.New()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("schedule_job")
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := scheduler.NewGoCronScheduler(context.Background(), []models.SchedulableEntity{}, schedulerScope, snapshot, rateLimiter, executor, false)
c.Start()

tests := []struct {
testName string
lastExecTime *time.Time
assertionFunc func(t assert.TestingT, expected, actual interface{}, msgAndArgs ...interface{}) bool
}{
{testName: "using_schedule_time", lastExecTime: &now, assertionFunc: assert.Equal},
{testName: "without_schedule_time", lastExecTime: nil, assertionFunc: assert.NotEqual},
}
wg := &sync.WaitGroup{}
for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
tc.assertionFunc(t, now, scheduleTime)
wg.Done()
return nil
}
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, tc.lastExecTime)
assert.NoError(t, err)
})
}

select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
c.Stop()
break
}
}

func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {
wg.Wait()
ch <- true
}()
return ch
}