Skip to content

Commit

Permalink
Use ratelimiter config in webapi plugins (#5190)
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <[email protected]>
Co-authored-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 and kumare3 authored Apr 5, 2024
1 parent 2eede89 commit c8be3e4
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8
golang.org/x/net v0.22.0
golang.org/x/oauth2 v0.16.0
golang.org/x/time v0.5.0
google.golang.org/api v0.155.0
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -128,7 +129,6 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package webapi

import (
"context"
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -161,6 +163,7 @@ func ToPluginPhase(s core.Phase) (Phase, error) {
}

func NewResourceCache(ctx context.Context, name string, client Client, cfg webapi.CachingConfig,
rateCfg webapi.RateLimiterConfig,
scope promutils.Scope) (ResourceCache, error) {

q := ResourceCache{
Expand All @@ -169,7 +172,10 @@ func NewResourceCache(ctx context.Context, name string, client Client, cfg webap
}

autoRefreshCache, err := cache.NewAutoRefreshCache(name, q.SyncResource,
workqueue.DefaultControllerRateLimiter(), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size,
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(rateCfg.QPS), rateCfg.Burst)},
), cfg.ResyncInterval.Duration, cfg.Workers, cfg.Size,
scope.NewSubScope("cache"))

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ func TestNewResourceCache(t *testing.T) {
t.Run("Simple", func(t *testing.T) {
c, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{
Size: 10,
}, promutils.NewTestScope())
}, webapi.RateLimiterConfig{QPS: 1, Burst: 1}, promutils.NewTestScope())
assert.NoError(t, err)
assert.NotNil(t, c)
})

t.Run("Error", func(t *testing.T) {
_, err := NewResourceCache(context.Background(), "Cache1", &mocks.Client{}, webapi.CachingConfig{},
webapi.RateLimiterConfig{},
promutils.NewTestScope())
assert.Error(t, err)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func createRemotePlugin(pluginEntry webapi.PluginEntry, c clock.Clock) core.Plug
}

resourceCache, err := NewResourceCache(ctx, pluginEntry.ID, p, p.GetConfig().Caching,
iCtx.MetricsScope().NewSubScope("cache"))
p.GetConfig().ReadRateLimiter, iCtx.MetricsScope().NewSubScope("cache"))

if err != nil {
return nil, err
Expand Down

0 comments on commit c8be3e4

Please sign in to comment.