Skip to content

Commit

Permalink
Add configuration for launchplan cache resync duration (#115)
Browse files Browse the repository at this point in the history
## Overview
Currently, the launchplan cache resync duration uses the [DownstreamEval duration configuration](https://github.com/unionai/flyte/blob/f1569a2be356f372fb5dadd88b111758011f2f12/flytepropeller/pkg/controller/controller.go#L340-L340) which is also used for the [sync period on the k8s client](https://github.com/unionai/flyte/blob/master/flytepropeller/cmd/controller/cmd/root.go#L143-L143). This means if we want to configure a more aggressive launchplan cache resync, we would also incur overhead in syncing all k8s resources (ex. Pods from `PodPlugin`). By adding a separate configuration value we can update these independently.

## Test Plan
Tested locally with default / 1s configuration. Should deploy in dogfood for peace of mind.

## Rollout Plan (if applicable)
Release should be unconfigured to fallback to the default of 30s. For specific use-cases (ex. Spotify) we will aggressively reduce this (ex. 1s).

## Upstream Changes
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/CLOUD-1574

## Checklist
* [x] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored Mar 6, 2024
1 parent d0baca9 commit f4c108d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 18 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter

var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration,
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient,
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store)
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package launchplan
import (
"context"
"fmt"
"time"

"github.com/golang/protobuf/ptypes/wrappers"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -311,14 +310,14 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
}

func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient,
syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) {
cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) {
exec := &adminLaunchPlanExecutor{
adminClient: client,
store: store,
}

rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)}
c, err := cache.NewAutoRefreshCache("admin-launcher", exec.syncItem, rateLimiter, syncPeriod, cfg.Workers, cfg.MaxCacheSize, scope)
c, err := cache.NewAutoRefreshCache("admin-launcher", exec.syncItem, rateLimiter, cfg.CacheResyncDuration.Duration, cfg.Workers, cfg.MaxCacheSize, scope)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/cache"
mocks2 "github.com/flyteorg/flyte/flytestdlib/cache/mocks"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
Expand All @@ -26,6 +27,9 @@ import (

func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
ctx := context.TODO()
adminConfig := defaultAdminConfig
adminConfig.CacheResyncDuration = config.Duration{Duration: time.Millisecond}

id := &core.WorkflowExecutionIdentifier{
Name: "n",
Domain: "d",
Expand All @@ -38,7 +42,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {

t.Run("happy", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)
mockClient.On("GetExecution",
ctx,
Expand All @@ -65,7 +69,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(nil, status.Error(codes.NotFound, ""))

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

assert.NoError(t, exec.Initialize(ctx))
Expand Down Expand Up @@ -111,7 +115,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
mock.MatchedBy(func(o *admin.WorkflowExecutionGetRequest) bool { return true }),
).Return(nil, status.Error(codes.Canceled, ""))

exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

assert.NoError(t, exec.Initialize(ctx))
Expand Down Expand Up @@ -146,6 +150,8 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {

func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
ctx := context.TODO()
adminConfig := defaultAdminConfig
adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second}
id := &core.WorkflowExecutionIdentifier{
Name: "n",
Domain: "d",
Expand All @@ -157,7 +163,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("happy", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool {
Expand Down Expand Up @@ -195,7 +201,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Name: "orig",
},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("RecoverExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionRecoverRequest) bool {
Expand Down Expand Up @@ -231,7 +237,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Name: "orig",
},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)

recoveryErr := status.Error(codes.NotFound, "foo")
Expand Down Expand Up @@ -273,7 +279,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("notFound", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }),
Expand Down Expand Up @@ -301,7 +307,7 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
t.Run("other", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("CreateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionCreateRequest) bool { return true }),
Expand Down Expand Up @@ -329,6 +335,8 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {

func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
ctx := context.TODO()
adminConfig := defaultAdminConfig
adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second}
id := &core.WorkflowExecutionIdentifier{
Name: "n",
Domain: "d",
Expand All @@ -341,7 +349,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("happy", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand All @@ -354,7 +362,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("notFound", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand All @@ -367,7 +375,7 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {
t.Run("other", func(t *testing.T) {

mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
mockClient.On("TerminateExecution",
ctx,
mock.MatchedBy(func(o *admin.ExecutionTerminateRequest) bool { return o.Id == id && o.Cause == reason }),
Expand All @@ -381,6 +389,8 @@ func TestAdminLaunchPlanExecutor_Kill(t *testing.T) {

func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {
ctx := context.TODO()
adminConfig := defaultAdminConfig
adminConfig.CacheResyncDuration = config.Duration{Duration: time.Second}
id := &core.Identifier{
ResourceType: core.ResourceType_LAUNCH_PLAN,
Name: "n",
Expand All @@ -393,7 +403,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {

t.Run("launch plan found", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)
mockClient.OnGetLaunchPlanMatch(
ctx,
Expand All @@ -406,7 +416,7 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {

t.Run("launch plan not found", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Second, defaultAdminConfig, promutils.NewTestScope(), memStore)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), memStore)
assert.NoError(t, err)
mockClient.OnGetLaunchPlanMatch(
ctx,
Expand Down Expand Up @@ -435,6 +445,9 @@ type test struct {
func TestAdminLaunchPlanExecutorScenarios(t *testing.T) {
ctx := context.TODO()

adminConfig := defaultAdminConfig
adminConfig.CacheResyncDuration = config.Duration{Duration: time.Millisecond}

mockExecutionRespWithOutputs := &admin.Execution{
Closure: &admin.ExecutionClosure{
Phase: core.WorkflowExecution_SUCCEEDED,
Expand Down Expand Up @@ -546,7 +559,7 @@ func TestAdminLaunchPlanExecutorScenarios(t *testing.T) {
ComposedProtobufStore: pbStore,
ReferenceConstructor: &storageMocks.ReferenceConstructor{},
}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope(), storageClient)
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, adminConfig, promutils.NewTestScope(), storageClient)
assert.NoError(t, err)

iwMock := &mocks2.ItemWrapper{}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package launchplan

import (
"time"

ctrlConfig "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/config"
)

//go:generate pflags AdminConfig --default-var defaultAdminConfig
Expand All @@ -12,6 +15,9 @@ var (
Burst: 10,
MaxCacheSize: 10000,
Workers: 10,
CacheResyncDuration: config.Duration{
Duration: 30 * time.Second,
},
}

adminConfigSection = ctrlConfig.MustRegisterSubSection("admin-launcher", defaultAdminConfig)
Expand All @@ -31,6 +37,9 @@ type AdminConfig struct {
MaxCacheSize int `json:"cacheSize" pflag:",Maximum cache in terms of number of items stored."`

Workers int `json:"workers" pflag:",Number of parallel workers to work on the queue."`

// CacheResyncDuration defines the interval that the admin launcher should refresh the launchplan cache.
CacheResyncDuration config.Duration `json:"cache-resync-duration" pflag:",Frequency of re-syncing launchplans within the auto refresh cache."`
}

func GetAdminConfig() *AdminConfig {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f4c108d

Please sign in to comment.