Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Added race skip check
Browse files Browse the repository at this point in the history
Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss committed Sep 16, 2022
1 parent f0e4c89 commit 6dcbad5
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,4 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84
replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.1.13 h1:xRUOu9+6c/zTZRTv+He1s4kX7uxmd/K5y7tAP598f8A=
github.com/flyteorg/flyteidl v1.1.13/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteidl v1.1.15 h1:h+T8yeya5OEt7POav0wZkjPdtUatilraVTuwrioqzuA=
github.com/flyteorg/flyteidl v1.1.15/go.mod h1:SLTYz2JgIKvM5MbPVlMP7uILb65fnuuZQZFHHIEYh2U=
github.com/flyteorg/flyteplugins v1.0.10 h1:XBycM4aOSE/WlI8iP9vqogKGXy4FMfVCUUfzxJus/p4=
Expand Down Expand Up @@ -1459,8 +1457,8 @@ github.com/uber/jaeger-lib v1.5.0/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/Aaua
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 h1:EompdlTtH1GbcgfTNe+sAwHeDdeboYAvywrlVDbnixQ=
github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a h1:pTbOzzOh94yt3WHt7uqZ1m6PUZth++GspSJGK9dEBC4=
github.com/unionai/cron/v3 v3.0.2-0.20220915080349-5790c370e63a/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/unrolled/secure v0.0.0-20180918153822-f340ee86eb8b/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/unrolled/secure v0.0.0-20181005190816-ff9db2ff917f/go.mod h1:mnPT77IAdsi/kV7+Es7y+pXALeV3h7G6dQF6mNYjcLA=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
Expand Down
14 changes: 9 additions & 5 deletions scheduler/core/gocron_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (g *GoCronScheduler) CalculateSnapshot(ctx context.Context) snapshoter.Snap

// ScheduleJob allows to schedule a job using the implemented scheduler
func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.SchedulableEntity,
funcWithSchedule TimedFuncWithSchedule, lastTime *time.Time) error {
funcWithSchedule TimedFuncWithSchedule, lastExecTime *time.Time) error {

nameOfSchedule := identifier.GetScheduleName(ctx, schedule)

Expand All @@ -120,11 +120,11 @@ func (g *GoCronScheduler) ScheduleJob(ctx context.Context, schedule models.Sched
return nil
}

// Update the catchupFrom time as the lastTime.
// Here lastTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Update the catchupFrom time as the lastExecTime.
// Here lastExecTime is passed to this function only from BootStrapSchedulesFromSnapShot which is during bootup
// Once initialized we wont be changing the catchupTime until the next boot
job := &GoCronJob{nameOfSchedule: nameOfSchedule, schedule: schedule, funcWithSchedule: funcWithSchedule,
catchupFromTime: lastTime, ctx: ctx}
catchupFromTime: lastExecTime, lastTime: lastExecTime, ctx: ctx}

// Define the timed job function to be used for the callback at the scheduled time
//jobFunc := job.GetTimedFunc(ctx, g.metrics)
Expand Down Expand Up @@ -271,7 +271,11 @@ func (g *GoCronScheduler) AddFixedIntervalJob(ctx context.Context, job *GoCronJo
var jobFunc cron.TimedFuncJob
jobFunc = job.Run

entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc)
var lastExecTime time.Time
if job.lastTime != nil {
lastExecTime = *job.lastTime
}
entryID := g.cron.ScheduleTimedJob(cron.ConstantDelaySchedule{Delay: d}, jobFunc, lastExecTime)
// Update the enttry id in the job which is handle to be used for removal
job.entryID = entryID
logger.Infof(ctx, "successfully added the fixed rate schedule %s to the scheduler for schedule %+v",
Expand Down
103 changes: 93 additions & 10 deletions scheduler/core/gocron_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
//go:build !race
// +build !race

package core

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"

adminModels "github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteadmin/scheduler/executor/mocks"
"github.com/flyteorg/flyteadmin/scheduler/repositories/models"
"github.com/flyteorg/flyteadmin/scheduler/snapshoter"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
)

var scheduleCron models.SchedulableEntity
Expand All @@ -26,9 +31,6 @@ var scheduleFixedDeactivated models.SchedulableEntity
var scheduleNonExistentDeActivated models.SchedulableEntity

func setup(t *testing.T, subscope string) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
var schedules []models.SchedulableEntity
True := true
False := false
Expand Down Expand Up @@ -104,15 +106,21 @@ func setup(t *testing.T, subscope string) *GoCronScheduler {
schedules = append(schedules, scheduleCronDeactivated)
schedules = append(schedules, scheduleFixedDeactivated)
schedules = append(schedules, scheduleNonExistentDeActivated)
return setupWithSchedules(t, subscope, schedules)
}

func setupWithSchedules(t *testing.T, subscope string, schedules []models.SchedulableEntity) *GoCronScheduler {
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope(subscope)
rateLimiter := rate.NewLimiter(1, 10)
executor := new(mocks.Executor)
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)

snapshot := &snapshoter.SnapshotV1{}
executor.OnExecuteMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
g := NewGoCronScheduler(context.Background(), schedules, schedulerScope, snapshot, rateLimiter, executor)
goCronScheduler, ok := g.(*GoCronScheduler)
assert.True(t, ok)
goCronScheduler.UpdateSchedules(context.Background(), schedules)
assert.True(t, ok)
goCronScheduler.BootStrapSchedulesFromSnapShot(context.Background(), schedules, snapshot)
goCronScheduler.CatchupAll(context.Background(), time.Now())
return goCronScheduler
Expand Down Expand Up @@ -276,3 +284,78 @@ func TestCatchUpAllSchedule(t *testing.T) {
catchupSuccess := g.CatchupAll(ctx, toTime)
assert.True(t, catchupSuccess)
}

func TestScheduleJob(t *testing.T) {
ctx := context.Background()
True := true
lastTime := time.Now()
scheduleFixed = models.SchedulableEntity{
BaseModel: adminModels.BaseModel{
UpdatedAt: time.Now(),
},
SchedulableEntityKey: models.SchedulableEntityKey{
Project: "project",
Domain: "domain",
Name: "fixed1",
Version: "version1",
},
FixedRateValue: 1,
Unit: admin.FixedRateUnit_MINUTE,
Active: &True,
}
t.Run("using schedule time", func(t *testing.T) {
var schedules []models.SchedulableEntity
schedules = append(schedules, scheduleFixed)
wg := &sync.WaitGroup{}
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
assert.Equal(t, lastTime, scheduleTime)
wg.Done()
return nil
}
c := cron.New()
g := GoCronScheduler{cron: c, jobStore: sync.Map{}}
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, &lastTime)
c.Start()
assert.NoError(t, err)
select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
break
}
})

t.Run("without schedule time", func(t *testing.T) {
var schedules []models.SchedulableEntity
schedules = append(schedules, scheduleFixed)
wg := &sync.WaitGroup{}
wg.Add(1)
timedFuncWithSchedule := func(jobCtx context.Context, schedule models.SchedulableEntity, scheduleTime time.Time) error {
assert.NotEqual(t, lastTime, scheduleTime)
wg.Done()
return nil
}
c := cron.New()
g := GoCronScheduler{cron: c, jobStore: sync.Map{}}
err := g.ScheduleJob(ctx, scheduleFixed, timedFuncWithSchedule, nil)
c.Start()
assert.NoError(t, err)
select {
case <-time.After(time.Minute * 2):
assert.Fail(t, "timed job didn't get triggered")
case <-wait(wg):
break
}
})

}

func wait(wg *sync.WaitGroup) chan bool {
ch := make(chan bool)
go func() {
wg.Wait()
ch <- true
}()
return ch
}

0 comments on commit 6dcbad5

Please sign in to comment.