Skip to content

Commit

Permalink
Create and use a limiter struct instead of just passing a function - …
Browse files Browse the repository at this point in the history
…will create dedicated factories later (#5454)
  • Loading branch information
jakobht authored Nov 22, 2023
1 parent 5f9eb08 commit cf07398
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 64 deletions.
6 changes: 3 additions & 3 deletions common/quotas/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import "sync"
// Collection stores a map of limiters by key
type Collection struct {
mu sync.RWMutex
factory func(string) Limiter
factory LimiterFactory
limiters map[string]Limiter
}

// NewCollection create a new limiter collection.
// Given factory is called to create new individual limiter.
func NewCollection(factory func(string) Limiter) *Collection {
func NewCollection(factory LimiterFactory) *Collection {
return &Collection{
factory: factory,
limiters: make(map[string]Limiter),
Expand All @@ -47,7 +47,7 @@ func (c *Collection) For(key string) Limiter {

if !ok {
// create a new limiter
newLimiter := c.factory(key)
newLimiter := c.factory.GetLimiter(key)

// verify that it is needed and add to map
c.mu.Lock()
Expand Down
41 changes: 41 additions & 0 deletions common/quotas/dynamicratelimiterfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package quotas

type LimiterFactory interface {
GetLimiter(domain string) Limiter
}

func NewDynamicRateLimiterFactory(rps RPSKeyFunc) LimiterFactory {
return dynamicRateLimiterFactory{
rps: rps,
}
}

type dynamicRateLimiterFactory struct {
rps RPSKeyFunc
}

func (f dynamicRateLimiterFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 { return f.rps(domain) })
}
5 changes: 0 additions & 5 deletions common/quotas/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,3 @@ func PerMember(service string, globalRPS, instanceRPS float64, resolver membersh
avgQuota := math.Max(globalRPS/float64(memberCount), 1)
return math.Min(avgQuota, instanceRPS)
}

// PerMemberDynamic is a dynamic variant (using RPSFunc) of PerMember
func PerMemberDynamic(service string, globalRPS, instanceRPS RPSFunc, resolver membership.Resolver) RPSFunc {
return func() float64 { return PerMember(service, globalRPS(), instanceRPS(), resolver) }
}
7 changes: 0 additions & 7 deletions common/quotas/global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,16 @@ func Test_PerMember(t *testing.T) {

// Invalid service - fallback to instanceRPS
assert.Equal(t, 3.0, PerMember("X", 20.0, 3.0, resolver))
assert.Equal(t, 3.0, PerMemberDynamic("X", rps(20.0), rps(3.0), resolver)())

// Invalid member count - fallback to instanceRPS
assert.Equal(t, 3.0, PerMember("Y", 20.0, 3.0, resolver))
assert.Equal(t, 3.0, PerMemberDynamic("Y", rps(20.0), rps(3.0), resolver)())

// GlobalRPS not provided - fallback to instanceRPS
assert.Equal(t, 3.0, PerMember("A", 0, 3.0, resolver))
assert.Equal(t, 3.0, PerMemberDynamic("A", rps(0.0), rps(3.0), resolver)())

// Calculate average per member RPS (prefer averaged global - lower)
assert.Equal(t, 2.0, PerMember("A", 20.0, 3.0, resolver))
assert.Equal(t, 2.0, PerMemberDynamic("A", rps(20.0), rps(3.0), resolver)())

// Calculate average per member RPS (prefer instanceRPS - lower)
assert.Equal(t, 3.0, PerMember("A", 100.0, 3.0, resolver))
assert.Equal(t, 3.0, PerMemberDynamic("A", rps(100.0), rps(3.0), resolver)())
}

func rps(val float64) RPSFunc { return func() float64 { return val } }
2 changes: 1 addition & 1 deletion common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func newFixedRpsMultiStageRateLimiter(globalRps, domainRps float64) Policy {
NewDynamicRateLimiter(func() float64 {
return globalRps
}),
NewCollection(DynamicRateLimiterFactory(func(domain string) float64 {
NewCollection(NewDynamicRateLimiterFactory(func(domain string) float64 {
return domainRps
})),
)
Expand Down
16 changes: 9 additions & 7 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/uber/cadence/common/dynamicconfig/configstore"
csc "github.com/uber/cadence/common/dynamicconfig/configstore/config"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/taskvalidator"

"github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate"
Expand Down Expand Up @@ -60,7 +61,6 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
persistenceClient "github.com/uber/cadence/common/persistence/client"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
)

Expand Down Expand Up @@ -179,12 +179,14 @@ func New(

persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory(
&params.PersistenceConfig,
quotas.PerMemberDynamic(
serviceName,
serviceConfig.PersistenceGlobalMaxQPS.AsFloat64(),
serviceConfig.PersistenceMaxQPS.AsFloat64(),
membershipResolver,
),
func() float64 {
return quotas.PerMember(
serviceName,
float64(serviceConfig.PersistenceGlobalMaxQPS()),
float64(serviceConfig.PersistenceMaxQPS()),
membershipResolver,
)
},
params.ClusterMetadata.GetCurrentClusterName(),
params.MetricsClient,
logger,
Expand Down
51 changes: 27 additions & 24 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,36 +174,39 @@ func NewWorkflowHandler(
tokenSerializer: common.NewJSONTaskTokenSerializer(),
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(func(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
service.Frontend,
config.GlobalDomainUserRPS.AsFloat64(domain),
config.MaxDomainUserRPSPerInstance.AsFloat64(domain),
resource.GetMembershipResolver(),
))
}),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainUserRPS(domain)),
float64(config.MaxDomainUserRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(func(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
service.Frontend,
config.GlobalDomainWorkerRPS.AsFloat64(domain),
config.MaxDomainWorkerRPSPerInstance.AsFloat64(domain),
resource.GetMembershipResolver(),
))
}),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainWorkerRPS(domain)),
float64(config.MaxDomainWorkerRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
),
visibilityRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()),
quotas.NewCollection(func(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
service.Frontend,
config.GlobalDomainVisibilityRPS.AsFloat64(domain),
config.MaxDomainVisibilityRPSPerInstance.AsFloat64(domain),
resource.GetMembershipResolver(),
))
}),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainVisibilityRPS(domain)),
float64(config.MaxDomainVisibilityRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
),
versionChecker: versionChecker,
domainHandler: domainHandler,
Expand Down
28 changes: 16 additions & 12 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,22 @@ func NewEngineWithShardContext(
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()),
quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
service.History,
config.ArchiveInlineHistoryGlobalRPS.AsFloat64(),
config.ArchiveInlineHistoryRPS.AsFloat64(),
shard.GetService().GetMembershipResolver(),
)),
quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic(
service.History,
config.ArchiveInlineVisibilityGlobalRPS.AsFloat64(),
config.ArchiveInlineVisibilityRPS.AsFloat64(),
shard.GetService().GetMembershipResolver(),
)),
quotas.NewDynamicRateLimiter(func() float64 {
return quotas.PerMember(
service.History,
float64(config.ArchiveInlineHistoryGlobalRPS()),
float64(config.ArchiveInlineHistoryRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
quotas.NewDynamicRateLimiter(func() float64 {
return quotas.PerMember(
service.History,
float64(config.ArchiveInlineVisibilityGlobalRPS()),
float64(config.ArchiveInlineVisibilityRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
shard.GetService().GetArchiverProvider(),
config.AllowArchivingIncompleteHistory,
),
Expand Down
8 changes: 5 additions & 3 deletions service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ func NewPriorityAssigner(
config: config,
logger: logger,
scope: metricClient.Scope(metrics.TaskPriorityAssignerScope),
rateLimiters: quotas.NewCollection(func(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(config.TaskProcessRPS.AsFloat64(domain))
}),
rateLimiters: quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return float64(config.TaskProcessRPS(domain))
},
)),
}
}

Expand Down
4 changes: 2 additions & 2 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewHandler(
metricsClient: metricsClient,
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(quotas.DynamicRateLimiterFactory(
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainUserRPS(domain))
if domainRPS > 0 {
Expand All @@ -97,7 +97,7 @@ func NewHandler(
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(quotas.DynamicRateLimiterFactory(
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainWorkerRPS(domain))
if domainRPS > 0 {
Expand Down

0 comments on commit cf07398

Please sign in to comment.