diff --git a/common/quotas/collection.go b/common/quotas/collection.go index 3fa2b1d9257..59d78c3ae0e 100644 --- a/common/quotas/collection.go +++ b/common/quotas/collection.go @@ -25,13 +25,13 @@ import "sync" // Collection stores a map of limiters by key type Collection struct { mu sync.RWMutex - factory func(string) Limiter + factory LimiterFactory limiters map[string]Limiter } // NewCollection create a new limiter collection. // Given factory is called to create new individual limiter. -func NewCollection(factory func(string) Limiter) *Collection { +func NewCollection(factory LimiterFactory) *Collection { return &Collection{ factory: factory, limiters: make(map[string]Limiter), @@ -47,7 +47,7 @@ func (c *Collection) For(key string) Limiter { if !ok { // create a new limiter - newLimiter := c.factory(key) + newLimiter := c.factory.GetLimiter(key) // verify that it is needed and add to map c.mu.Lock() diff --git a/common/quotas/dynamicratelimiterfactory.go b/common/quotas/dynamicratelimiterfactory.go new file mode 100644 index 00000000000..dc7cc67864e --- /dev/null +++ b/common/quotas/dynamicratelimiterfactory.go @@ -0,0 +1,41 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package quotas + +type LimiterFactory interface { + GetLimiter(domain string) Limiter +} + +func NewDynamicRateLimiterFactory(rps RPSKeyFunc) LimiterFactory { + return dynamicRateLimiterFactory{ + rps: rps, + } +} + +type dynamicRateLimiterFactory struct { + rps RPSKeyFunc +} + +func (f dynamicRateLimiterFactory) GetLimiter(domain string) Limiter { + return NewDynamicRateLimiter(func() float64 { return f.rps(domain) }) +} diff --git a/common/quotas/global.go b/common/quotas/global.go index d9fe7442516..64124ba832e 100644 --- a/common/quotas/global.go +++ b/common/quotas/global.go @@ -41,8 +41,3 @@ func PerMember(service string, globalRPS, instanceRPS float64, resolver membersh avgQuota := math.Max(globalRPS/float64(memberCount), 1) return math.Min(avgQuota, instanceRPS) } - -// PerMemberDynamic is a dynamic variant (using RPSFunc) of PerMember -func PerMemberDynamic(service string, globalRPS, instanceRPS RPSFunc, resolver membership.Resolver) RPSFunc { - return func() float64 { return PerMember(service, globalRPS(), instanceRPS(), resolver) } -} diff --git a/common/quotas/global_test.go b/common/quotas/global_test.go index ec0bbd5ca22..fa0b8e0ea24 100644 --- a/common/quotas/global_test.go +++ b/common/quotas/global_test.go @@ -38,23 +38,16 @@ func Test_PerMember(t *testing.T) { // Invalid service - fallback to instanceRPS assert.Equal(t, 3.0, PerMember("X", 20.0, 3.0, resolver)) - assert.Equal(t, 3.0, PerMemberDynamic("X", rps(20.0), rps(3.0), resolver)()) // Invalid member count - fallback to instanceRPS assert.Equal(t, 3.0, PerMember("Y", 20.0, 3.0, resolver)) - assert.Equal(t, 3.0, PerMemberDynamic("Y", rps(20.0), rps(3.0), resolver)()) // GlobalRPS not provided - fallback to instanceRPS assert.Equal(t, 3.0, PerMember("A", 0, 3.0, resolver)) - assert.Equal(t, 3.0, PerMemberDynamic("A", rps(0.0), rps(3.0), resolver)()) // Calculate average per member RPS (prefer averaged global - lower) assert.Equal(t, 2.0, PerMember("A", 20.0, 3.0, resolver)) - assert.Equal(t, 2.0, PerMemberDynamic("A", rps(20.0), rps(3.0), resolver)()) // Calculate average per member RPS (prefer instanceRPS - lower) assert.Equal(t, 3.0, PerMember("A", 100.0, 3.0, resolver)) - assert.Equal(t, 3.0, PerMemberDynamic("A", rps(100.0), rps(3.0), resolver)()) } - -func rps(val float64) RPSFunc { return func() float64 { return val } } diff --git a/common/quotas/limiter_test.go b/common/quotas/limiter_test.go index b9df200d67b..1d3dfe2cda2 100644 --- a/common/quotas/limiter_test.go +++ b/common/quotas/limiter_test.go @@ -141,7 +141,7 @@ func newFixedRpsMultiStageRateLimiter(globalRps, domainRps float64) Policy { NewDynamicRateLimiter(func() float64 { return globalRps }), - NewCollection(DynamicRateLimiterFactory(func(domain string) float64 { + NewCollection(NewDynamicRateLimiterFactory(func(domain string) float64 { return domainRps })), ) diff --git a/common/resource/resourceImpl.go b/common/resource/resourceImpl.go index e0ec9e73dd7..8ad0d1d5c27 100644 --- a/common/resource/resourceImpl.go +++ b/common/resource/resourceImpl.go @@ -27,6 +27,7 @@ import ( "github.com/uber/cadence/common/dynamicconfig/configstore" csc "github.com/uber/cadence/common/dynamicconfig/configstore/config" + "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/taskvalidator" "github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate" @@ -60,7 +61,6 @@ import ( "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" persistenceClient "github.com/uber/cadence/common/persistence/client" - "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/service" ) @@ -179,12 +179,14 @@ func New( persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory( ¶ms.PersistenceConfig, - quotas.PerMemberDynamic( - serviceName, - serviceConfig.PersistenceGlobalMaxQPS.AsFloat64(), - serviceConfig.PersistenceMaxQPS.AsFloat64(), - membershipResolver, - ), + func() float64 { + return quotas.PerMember( + serviceName, + float64(serviceConfig.PersistenceGlobalMaxQPS()), + float64(serviceConfig.PersistenceMaxQPS()), + membershipResolver, + ) + }, params.ClusterMetadata.GetCurrentClusterName(), params.MetricsClient, logger, diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 9d0dba51308..2c30dbe07df 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -174,36 +174,39 @@ func NewWorkflowHandler( tokenSerializer: common.NewJSONTaskTokenSerializer(), userRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()), - quotas.NewCollection(func(domain string) quotas.Limiter { - return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic( - service.Frontend, - config.GlobalDomainUserRPS.AsFloat64(domain), - config.MaxDomainUserRPSPerInstance.AsFloat64(domain), - resource.GetMembershipResolver(), - )) - }), + quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( + func(domain string) float64 { + return quotas.PerMember( + service.Frontend, + float64(config.GlobalDomainUserRPS(domain)), + float64(config.MaxDomainUserRPSPerInstance(domain)), + resource.GetMembershipResolver(), + ) + })), ), workerRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()), - quotas.NewCollection(func(domain string) quotas.Limiter { - return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic( - service.Frontend, - config.GlobalDomainWorkerRPS.AsFloat64(domain), - config.MaxDomainWorkerRPSPerInstance.AsFloat64(domain), - resource.GetMembershipResolver(), - )) - }), + quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( + func(domain string) float64 { + return quotas.PerMember( + service.Frontend, + float64(config.GlobalDomainWorkerRPS(domain)), + float64(config.MaxDomainWorkerRPSPerInstance(domain)), + resource.GetMembershipResolver(), + ) + })), ), visibilityRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()), - quotas.NewCollection(func(domain string) quotas.Limiter { - return quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic( - service.Frontend, - config.GlobalDomainVisibilityRPS.AsFloat64(domain), - config.MaxDomainVisibilityRPSPerInstance.AsFloat64(domain), - resource.GetMembershipResolver(), - )) - }), + quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( + func(domain string) float64 { + return quotas.PerMember( + service.Frontend, + float64(config.GlobalDomainVisibilityRPS(domain)), + float64(config.MaxDomainVisibilityRPSPerInstance(domain)), + resource.GetMembershipResolver(), + ) + })), ), versionChecker: versionChecker, domainHandler: domainHandler, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index d549b77d67a..9b22d195876 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -194,18 +194,22 @@ func NewEngineWithShardContext( publicClient, shard.GetConfig().NumArchiveSystemWorkflows, quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()), - quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic( - service.History, - config.ArchiveInlineHistoryGlobalRPS.AsFloat64(), - config.ArchiveInlineHistoryRPS.AsFloat64(), - shard.GetService().GetMembershipResolver(), - )), - quotas.NewDynamicRateLimiter(quotas.PerMemberDynamic( - service.History, - config.ArchiveInlineVisibilityGlobalRPS.AsFloat64(), - config.ArchiveInlineVisibilityRPS.AsFloat64(), - shard.GetService().GetMembershipResolver(), - )), + quotas.NewDynamicRateLimiter(func() float64 { + return quotas.PerMember( + service.History, + float64(config.ArchiveInlineHistoryGlobalRPS()), + float64(config.ArchiveInlineHistoryRPS()), + shard.GetService().GetMembershipResolver(), + ) + }), + quotas.NewDynamicRateLimiter(func() float64 { + return quotas.PerMember( + service.History, + float64(config.ArchiveInlineVisibilityGlobalRPS()), + float64(config.ArchiveInlineVisibilityRPS()), + shard.GetService().GetMembershipResolver(), + ) + }), shard.GetService().GetArchiverProvider(), config.AllowArchivingIncompleteHistory, ), diff --git a/service/history/task/priority_assigner.go b/service/history/task/priority_assigner.go index d677c227331..c7249cb2520 100644 --- a/service/history/task/priority_assigner.go +++ b/service/history/task/priority_assigner.go @@ -68,9 +68,11 @@ func NewPriorityAssigner( config: config, logger: logger, scope: metricClient.Scope(metrics.TaskPriorityAssignerScope), - rateLimiters: quotas.NewCollection(func(domain string) quotas.Limiter { - return quotas.NewDynamicRateLimiter(config.TaskProcessRPS.AsFloat64(domain)) - }), + rateLimiters: quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( + func(domain string) float64 { + return float64(config.TaskProcessRPS(domain)) + }, + )), } } diff --git a/service/matching/handler.go b/service/matching/handler.go index 066bbdfa02c..ac29de9d457 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -85,7 +85,7 @@ func NewHandler( metricsClient: metricsClient, userRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()), - quotas.NewCollection(quotas.DynamicRateLimiterFactory( + quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( func(domain string) float64 { domainRPS := float64(config.DomainUserRPS(domain)) if domainRPS > 0 { @@ -97,7 +97,7 @@ func NewHandler( ), workerRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()), - quotas.NewCollection(quotas.DynamicRateLimiterFactory( + quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( func(domain string) float64 { domainRPS := float64(config.DomainWorkerRPS(domain)) if domainRPS > 0 {