From eb101186885bd415b3e246ab35a9a7cbd31cd6c8 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 5 Apr 2024 09:41:19 -0700 Subject: [PATCH 1/3] WIP use ratelimiter config in webapi plugins Signed-off-by: Ketan Umare --- flyteplugins/go.mod | 2 +- .../go/tasks/pluginmachinery/internal/webapi/cache.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 6e5ae9670a..5464f06c89 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -28,6 +28,7 @@ require ( golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 golang.org/x/net v0.22.0 golang.org/x/oauth2 v0.16.0 + golang.org/x/time v0.5.0 google.golang.org/api v0.155.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 @@ -128,7 +129,6 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index a305323dca..8b0b49e993 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -2,8 +2,9 @@ package webapi import ( "context" - + "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" + "time" "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core" @@ -161,6 +162,7 @@ func ToPluginPhase(s core.Phase) (Phase, error) { } func NewResourceCache(ctx context.Context, name string, client Client, cfg webapi.CachingConfig, + rateCfg webapi.RateLimiterConfig, scope promutils.Scope) (ResourceCache, error) { q := ResourceCache{ @@ -169,7 +171,10 @@ func NewResourceCache(ctx context.Context, name string, client Client, cfg webap } autoRefreshCache, err := cache.NewAutoRefreshCache(name, q.SyncResource, - workqueue.DefaultControllerRateLimiter(), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, + workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateCfg.QPS), rateCfg.Burst)}, + ), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size, scope.NewSubScope("cache")) if err != nil { From 87a5e3ce985597b073ccd824f999b8ff4e663831 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 5 Apr 2024 09:47:07 -0700 Subject: [PATCH 2/3] fixed ratelimiter config Signed-off-by: Ketan Umare --- .../go/tasks/pluginmachinery/internal/webapi/cache_test.go | 3 ++- flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go index 53e80cd391..d228e7954e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache_test.go @@ -21,13 +21,14 @@ func TestNewResourceCache(t *testing.T) { t.Run("Simple", func(t *testing.T) { c, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{ Size: 10, - }, promutils.NewTestScope()) + }, webapi.RateLimiterConfig{QPS: 1, Burst: 1}, promutils.NewTestScope()) assert.NoError(t, err) assert.NotNil(t, c) }) t.Run("Error", func(t *testing.T) { _, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{}, + webapi.RateLimiterConfig{}, promutils.NewTestScope()) assert.Error(t, err) }) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go index a23f985fc6..9c98521897 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go @@ -191,7 +191,7 @@ func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.Plug } resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching, - iCtx.MetricsScope().NewSubScope("cache")) + p.GetConfig().ReadRateLimiter, iCtx.MetricsScope().NewSubScope("cache")) if err != nil { return nil, err From 144739e9be874c220863df20b373c2aa42e70a2a Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 5 Apr 2024 11:36:04 -0700 Subject: [PATCH 3/3] updated ratelimiter Signed-off-by: Ketan Umare --- flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index 8b0b49e993..b13070534e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -2,9 +2,10 @@ package webapi import ( "context" + "time" + "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" - "time" "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"