Skip to content

Commit

Permalink
Change downscale detection
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Nov 22, 2024
1 parent ffbd993 commit 5af33e8
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 20 deletions.
23 changes: 23 additions & 0 deletions common/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type FloatPropertyFn func(opts ...FilterOption) float64
// FloatPropertyFnWithShardIDFilter is a wrapper to get float property from dynamic config with shardID as filter
type FloatPropertyFnWithShardIDFilter func(shardID int) float64

// FloatPropertyFnWithTaskListInfoFilters is a wrapper to get duration property from dynamic config with three filters: domain, taskList, taskType
type FloatPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) float64

// DurationPropertyFn is a wrapper to get duration property from dynamic config
type DurationPropertyFn func(opts ...FilterOption) time.Duration

Expand Down Expand Up @@ -302,6 +305,26 @@ func (c *Collection) GetFloat64PropertyFilteredByShardID(key FloatKey) FloatProp
}
}

// GetFloatPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's a float64
func (c *Collection) GetFloat64PropertyFilteredByTaskListInfo(key FloatKey) FloatPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) float64 {
filters := c.toFilterMap(
DomainFilter(domain),
TaskListFilter(taskList),
TaskTypeFilter(taskType),
)
val, err := c.client.GetFloatValue(
key,
filters,
)
if err != nil {
c.logError(key, filters, err)
return key.DefaultFloat()
}
return val
}
}

// GetDurationProperty gets property and asserts that it's a duration
func (c *Collection) GetDurationProperty(key DurationKey) DurationPropertyFn {
return func(opts ...FilterOption) time.Duration {
Expand Down
15 changes: 8 additions & 7 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,6 @@ const (
MatchingForwarderMaxChildrenPerNode

MatchingPartitionUpscaleRPS
MatchingPartitionDownscaleRPS

// key for history

Expand Down Expand Up @@ -2249,6 +2248,8 @@ const (
// Default value: 0.5
HistoryGlobalRatelimiterNewDataWeight

MatchingPartitionDownscaleFactor

// LastFloatKey must be the last one in this const group
LastFloatKey
)
Expand Down Expand Up @@ -3293,12 +3294,6 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "MatchingPartitionUpscaleRPS is the threshold of adding tasks RPS per partition to trigger upscale",
DefaultValue: 200,
},
MatchingPartitionDownscaleRPS: {
KeyName: "matching.partitionDownscaleRPS",
Filters: []Filter{DomainName, TaskListName, TaskType},
Description: "MatchingPartitionDownscaleRPS is the threshold of adding tasks RPS per partition to trigger downscale",
DefaultValue: 100,
},
HistoryRPS: {
KeyName: "history.rps",
Description: "HistoryRPS is request rate per second for each history host",
Expand Down Expand Up @@ -4552,6 +4547,12 @@ var FloatKeys = map[FloatKey]DynamicFloat{
Description: "HistoryGlobalRatelimiterNewDataWeight defines how much weight to give each host's newest data, per update. Must be between 0 and 1, higher values match new values more closely after a single update",
DefaultValue: 0.5,
},
MatchingPartitionDownscaleFactor: {
KeyName: "matching.partitionDownscaleFactor",
Description: "MatchingPartitionDownscaleFactor introduces hysteresis to prevent oscillation by setting a lower QPS threshold for downscaling, ensuring partitions are only removed when the load decreases significantly below the capacity of fewer partitions.",
Filters: []Filter{DomainName, TaskListName, TaskType},
DefaultValue: 0.75,
},
}

var StringKeys = map[StringKey]DynamicString{
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2632,6 +2632,8 @@ const (
TaskListReadWritePartitionMismatchGauge
TaskListPollerPartitionMismatchGauge
EstimatedAddTaskQPSGauge
TaskListPartitionUpscaleThresholdGauge
TaskListPartitionDownscaleThresholdGauge

NumMatchingMetrics
)
Expand Down Expand Up @@ -3322,6 +3324,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
TaskListReadWritePartitionMismatchGauge: {metricName: "tasklist_read_write_partition_mismatch", metricType: Gauge},
TaskListPollerPartitionMismatchGauge: {metricName: "tasklist_poller_partition_mismatch", metricType: Gauge},
EstimatedAddTaskQPSGauge: {metricName: "estimated_add_task_qps_per_tl", metricType: Gauge},
TaskListPartitionUpscaleThresholdGauge: {metricName: "tasklist_partition_upscale_threshold", metricType: Gauge},
TaskListPartitionDownscaleThresholdGauge: {metricName: "tasklist_partition_downscale_threshold", metricType: Gauge},
},
Worker: {
ReplicatorMessages: {metricName: "replicator_messages"},
Expand Down
6 changes: 3 additions & 3 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
LocalTaskWaitTime dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
EnableGetNumberOfPartitionsFromCache dynamicconfig.BoolPropertyFnWithTaskListInfoFilters
PartitionUpscaleRPS dynamicconfig.IntPropertyFnWithTaskListInfoFilters
PartitionDownscaleRPS dynamicconfig.IntPropertyFnWithTaskListInfoFilters
PartitionDownscaleFactor dynamicconfig.FloatPropertyFnWithTaskListInfoFilters
PartitionUpscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
PartitionDownscaleSustainedDuration dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
AdaptiveScalerUpdateInterval dynamicconfig.DurationPropertyFnWithTaskListInfoFilters
Expand Down Expand Up @@ -116,7 +116,7 @@ type (
LocalPollWaitTime func() time.Duration
LocalTaskWaitTime func() time.Duration
PartitionUpscaleRPS func() int
PartitionDownscaleRPS func() int
PartitionDownscaleFactor func() float64
PartitionUpscaleSustainedDuration func() time.Duration
PartitionDownscaleSustainedDuration func() time.Duration
AdaptiveScalerUpdateInterval func() time.Duration
Expand Down Expand Up @@ -179,7 +179,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string, getIsolationGroups
LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime),
LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime),
PartitionUpscaleRPS: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionUpscaleRPS),
PartitionDownscaleRPS: dc.GetIntPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleRPS),
PartitionDownscaleFactor: dc.GetFloat64PropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleFactor),
PartitionUpscaleSustainedDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionUpscaleSustainedDuration),
PartitionDownscaleSustainedDuration: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingPartitionDownscaleSustainedDuration),
AdaptiveScalerUpdateInterval: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.MatchingAdaptiveScalerUpdateInterval),
Expand Down
4 changes: 3 additions & 1 deletion service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestNewConfig(t *testing.T) {
"EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false},
"EnableGetNumberOfPartitionsFromCache": {dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache, false},
"PartitionUpscaleRPS": {dynamicconfig.MatchingPartitionUpscaleRPS, 30},
"PartitionDownscaleRPS": {dynamicconfig.MatchingPartitionDownscaleRPS, 31},
"PartitionDownscaleFactor": {dynamicconfig.MatchingPartitionDownscaleFactor, 31.0},
"PartitionUpscaleSustainedDuration": {dynamicconfig.MatchingPartitionUpscaleSustainedDuration, time.Duration(32)},
"PartitionDownscaleSustainedDuration": {dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Duration(33)},
"AdaptiveScalerUpdateInterval": {dynamicconfig.MatchingAdaptiveScalerUpdateInterval, time.Duration(34)},
Expand Down Expand Up @@ -155,6 +155,8 @@ func getValue(f *reflect.Value) interface{} {
return fn()
case dynamicconfig.StringPropertyFn:
return fn()
case dynamicconfig.FloatPropertyFnWithTaskListInfoFilters:
return fn("domain", "tasklist", int(types.TaskListTypeDecision))
case func() []string:
return fn()
default:
Expand Down
11 changes: 5 additions & 6 deletions service/matching/tasklist/adaptive_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (a *adaptiveScalerImpl) run() {
return
}
qps := a.qpsTracker.QPS()
a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps)
partitionConfig := a.getPartitionConfig()
// adjust the number of write partitions based on qps
numWritePartitions := a.adjustWritePartitions(qps, partitionConfig.NumWritePartitions)
Expand Down Expand Up @@ -168,11 +167,11 @@ func (a *adaptiveScalerImpl) getPartitionConfig() *types.TaskListPartitionConfig

func (a *adaptiveScalerImpl) adjustWritePartitions(qps float64, numWritePartitions int32) int32 {
upscaleThreshold := float64(a.config.PartitionUpscaleRPS())
downscaleThreshold := float64(a.config.PartitionDownscaleRPS())
if downscaleThreshold > upscaleThreshold {
downscaleThreshold = upscaleThreshold
a.logger.Warn("downscale threshold is larger than upscale threshold, use upscale threshold for downscale threshold instead")
}
downscaleFactor := a.config.PartitionDownscaleFactor()
downscaleThreshold := float64(numWritePartitions-1) * upscaleThreshold * downscaleFactor / float64(numWritePartitions)
a.scope.UpdateGauge(metrics.EstimatedAddTaskQPSGauge, qps)
a.scope.UpdateGauge(metrics.TaskListPartitionUpscaleThresholdGauge, upscaleThreshold)
a.scope.UpdateGauge(metrics.TaskListPartitionDownscaleThresholdGauge, downscaleThreshold)

result := numWritePartitions
if qps > upscaleThreshold {
Expand Down
31 changes: 30 additions & 1 deletion service/matching/tasklist/adaptive_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,35 @@ func TestAdaptiveScalerRun(t *testing.T) {
},
cycles: 2,
},
{
name: "overload but no fluctuation",
mockSetup: func(deps *mockAdaptiveScalerDeps) {
// overload start
deps.mockQPSTracker.EXPECT().QPS().Return(210.0)
deps.mockManager.EXPECT().TaskListPartitionConfig().Return(nil)

// overload passing sustained period
deps.mockQPSTracker.EXPECT().QPS().Return(210.0)
deps.mockManager.EXPECT().TaskListPartitionConfig().Return(nil)
deps.mockManager.EXPECT().UpdateTaskListPartitionConfig(gomock.Any(), &types.TaskListPartitionConfig{
NumReadPartitions: 2,
NumWritePartitions: 2,
}).Return(nil)

// not overload with 1 partition, but avoid fluctuation, so don't scale down
deps.mockQPSTracker.EXPECT().QPS().Return(190.0)
deps.mockManager.EXPECT().TaskListPartitionConfig().Return(&types.TaskListPartitionConfig{
NumReadPartitions: 2,
NumWritePartitions: 2,
})
deps.mockQPSTracker.EXPECT().QPS().Return(190.0)
deps.mockManager.EXPECT().TaskListPartitionConfig().Return(&types.TaskListPartitionConfig{
NumReadPartitions: 2,
NumWritePartitions: 2,
})
},
cycles: 4,
},
}

for _, tc := range testCases {
Expand All @@ -219,7 +248,7 @@ func TestAdaptiveScalerRun(t *testing.T) {
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingEnableAdaptiveScaler, true))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingEnableGetNumberOfPartitionsFromCache, true))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionUpscaleRPS, 200))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleRPS, 100))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleFactor, 0.75))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionUpscaleSustainedDuration, time.Second))
require.NoError(t, deps.dynamicClient.UpdateValue(dynamicconfig.MatchingPartitionDownscaleSustainedDuration, time.Second))
tc.mockSetup(deps)
Expand Down
4 changes: 2 additions & 2 deletions service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,8 @@ func newTaskListConfig(id *Identifier, cfg *config.Config, domainName string) *c
PartitionUpscaleRPS: func() int {
return cfg.PartitionUpscaleRPS(domainName, taskListName, taskType)
},
PartitionDownscaleRPS: func() int {
return cfg.PartitionDownscaleRPS(domainName, taskListName, taskType)
PartitionDownscaleFactor: func() float64 {
return cfg.PartitionDownscaleFactor(domainName, taskListName, taskType)
},
PartitionUpscaleSustainedDuration: func() time.Duration {
return cfg.PartitionUpscaleSustainedDuration(domainName, taskListName, taskType)
Expand Down

0 comments on commit 5af33e8

Please sign in to comment.