Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic ratelimiter factories #5455

Merged
merged 3 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ type DynamicRateLimiter struct {
rl *RateLimiter
}

// DynamicRateLimiterFactory creates a factory function for creating DynamicRateLimiters
func DynamicRateLimiterFactory(rps RPSKeyFunc) func(string) Limiter {
return func(key string) Limiter {
return NewDynamicRateLimiter(func() float64 { return rps(key) })
}
}

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
initialRps := rps()
Expand Down
13 changes: 10 additions & 3 deletions common/quotas/dynamicratelimiterfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,27 @@

package quotas

import "github.com/uber/cadence/common/dynamicconfig"

// LimiterFactory is used to create a Limiter for a given domain
type LimiterFactory interface {
// GetLimiter returns a new Limiter for the given domain
GetLimiter(domain string) Limiter
}

func NewDynamicRateLimiterFactory(rps RPSKeyFunc) LimiterFactory {
// NewSimpleDynamicRateLimiterFactory creates a new LimiterFactory which creates
// a new DynamicRateLimiter for each domain, the RPS for the DynamicRateLimiter is given by the dynamic config
func NewSimpleDynamicRateLimiterFactory(rps dynamicconfig.IntPropertyFnWithDomainFilter) LimiterFactory {
return dynamicRateLimiterFactory{
rps: rps,
}
}

type dynamicRateLimiterFactory struct {
rps RPSKeyFunc
rps dynamicconfig.IntPropertyFnWithDomainFilter
}

// GetLimiter returns a new Limiter for the given domain
func (f dynamicRateLimiterFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 { return f.rps(domain) })
return NewDynamicRateLimiter(func() float64 { return float64(f.rps(domain)) })
}
60 changes: 60 additions & 0 deletions common/quotas/fallbackdynamicratelimiterfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package quotas

import "github.com/uber/cadence/common/dynamicconfig"

// LimiterFactory is used to create a Limiter for a given domain
// the created Limiter will use the primary dynamic config if it is set
// otherwise it will use the secondary dynamic config
func NewFallbackDynamicRateLimiterFactory(
primary dynamicconfig.IntPropertyFnWithDomainFilter,
secondary dynamicconfig.IntPropertyFn,
) LimiterFactory {
return fallbackDynamicRateLimiterFactory{
primary: primary,
secondary: secondary,
}
}

type fallbackDynamicRateLimiterFactory struct {
primary dynamicconfig.IntPropertyFnWithDomainFilter
// secondary is used when primary is not set
secondary dynamicconfig.IntPropertyFn
}

// GetLimiter returns a new Limiter for the given domain
func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 {
return limitWithFallback(
float64(f.primary(domain)),
float64(f.secondary()))
})
}

func limitWithFallback(primary, secondary float64) float64 {
if primary > 0 {
return primary
}
return secondary
}
58 changes: 58 additions & 0 deletions common/quotas/fallbackdynamicratelimiterfactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package quotas

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber/cadence/common/dynamicconfig"
)

func TestNewFallbackDynamicRateLimiterFactory(t *testing.T) {
factory := NewFallbackDynamicRateLimiterFactory(
func(string) int { return 2 },
func(opts ...dynamicconfig.FilterOption) int { return 100 },
)

limiter := factory.GetLimiter("TestDomainName")

// The limiter should accept 2 requests per second
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}

func TestNewFallbackDynamicRateLimiterFactoryFallback(t *testing.T) {
factory := NewFallbackDynamicRateLimiterFactory(
func(string) int { return 0 },
func(opts ...dynamicconfig.FilterOption) int { return 2 },
)

limiter := factory.GetLimiter("TestDomainName")

// The limiter should accept 2 requests per second
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}
4 changes: 2 additions & 2 deletions common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ func BenchmarkMultiStageRateLimiter1000Domains(b *testing.B) {
}
}

func newFixedRpsMultiStageRateLimiter(globalRps, domainRps float64) Policy {
func newFixedRpsMultiStageRateLimiter(globalRps float64, domainRps int) Policy {
return NewMultiStageRateLimiter(
NewDynamicRateLimiter(func() float64 {
return globalRps
}),
NewCollection(NewDynamicRateLimiterFactory(func(domain string) float64 {
NewCollection(NewSimpleDynamicRateLimiterFactory(func(domain string) int {
return domainRps
})),
)
Expand Down
37 changes: 37 additions & 0 deletions common/quotas/global.go → common/quotas/permember.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package quotas
import (
"math"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/membership"
)

Expand All @@ -41,3 +42,39 @@ func PerMember(service string, globalRPS, instanceRPS float64, resolver membersh
avgQuota := math.Max(globalRPS/float64(memberCount), 1)
return math.Min(avgQuota, instanceRPS)
}

// NewPerMemberDynamicRateLimiterFactory creates a new LimiterFactory which creates
// a new DynamicRateLimiter for each domain, the RPS for the DynamicRateLimiter is given
// by the globalRPS and averaged by member count for a given service.
// instanceRPS is used as a fallback if globalRPS is not provided.
func NewPerMemberDynamicRateLimiterFactory(
service string,
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter,
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter,
resolver membership.Resolver,
) LimiterFactory {
return perMemberFactory{
service: service,
globalRPS: globalRPS,
instanceRPS: instanceRPS,
resolver: resolver,
}
}

type perMemberFactory struct {
service string
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter
resolver membership.Resolver
}

func (f perMemberFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 {
return PerMember(
f.service,
float64(f.globalRPS(domain)),
float64(f.instanceRPS(domain)),
f.resolver,
)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
func Test_PerMember(t *testing.T) {
ctrl := gomock.NewController(t)
resolver := membership.NewMockResolver(ctrl)
resolver.EXPECT().MemberCount("A").Return(10, nil).AnyTimes()
resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).AnyTimes()
resolver.EXPECT().MemberCount("Y").Return(0, nil).AnyTimes()
resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1)
resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).MinTimes(1)
resolver.EXPECT().MemberCount("Y").Return(0, nil).MinTimes(1)

// Invalid service - fallback to instanceRPS
assert.Equal(t, 3.0, PerMember("X", 20.0, 3.0, resolver))
Expand All @@ -51,3 +51,23 @@ func Test_PerMember(t *testing.T) {
// Calculate average per member RPS (prefer instanceRPS - lower)
assert.Equal(t, 3.0, PerMember("A", 100.0, 3.0, resolver))
}

func Test_PerMemberFactory(t *testing.T) {
ctrl := gomock.NewController(t)
resolver := membership.NewMockResolver(ctrl)
resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1)

factory := NewPerMemberDynamicRateLimiterFactory(
"A",
func(string) int { return 20 },
func(string) int { return 3 },
resolver,
)

limiter := factory.GetLimiter("TestDomainName")

// The limit is 20 and there are 10 instances, so the per member limit is 2
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}
45 changes: 18 additions & 27 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,39 +174,30 @@ func NewWorkflowHandler(
tokenSerializer: common.NewJSONTaskTokenSerializer(),
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainUserRPS(domain)),
float64(config.MaxDomainUserRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainUserRPS,
config.MaxDomainUserRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainWorkerRPS(domain)),
float64(config.MaxDomainWorkerRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainWorkerRPS,
config.MaxDomainWorkerRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
visibilityRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainVisibilityRPS(domain)),
float64(config.MaxDomainVisibilityRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainVisibilityRPS,
config.MaxDomainVisibilityRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
versionChecker: versionChecker,
domainHandler: domainHandler,
Expand Down
6 changes: 2 additions & 4 deletions service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ func NewPriorityAssigner(
config: config,
logger: logger,
scope: metricClient.Scope(metrics.TaskPriorityAssignerScope),
rateLimiters: quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return float64(config.TaskProcessRPS(domain))
},
rateLimiters: quotas.NewCollection(quotas.NewSimpleDynamicRateLimiterFactory(
config.TaskProcessRPS,
)),
}
}
Expand Down
26 changes: 8 additions & 18 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,17 @@ func NewHandler(
metricsClient: metricsClient,
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainUserRPS(domain))
if domainRPS > 0 {
return domainRPS
}
// if domain rps not set, use host rps to keep the old behavior
return float64(config.UserRPS())
})),
quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory(
config.DomainUserRPS,
config.UserRPS,
)),
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainWorkerRPS(domain))
if domainRPS > 0 {
return domainRPS
}
// if domain rps not set, use host rps to keep the old behavior
return float64(config.WorkerRPS())
})),
quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory(
config.DomainWorkerRPS,
config.WorkerRPS,
)),
),
engine: engine,
logger: logger,
Expand Down