Skip to content

Commit

Permalink
Add cluster name filter (#3561)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Oct 5, 2020
1 parent b24ea12 commit e3f0b40
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 54 deletions.
8 changes: 6 additions & 2 deletions cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,12 @@ func (s *server) startService() common.Daemon {
log.Printf("error creating file based dynamic config client, use no-op config client instead. error: %v", err)
params.DynamicConfig = dynamicconfig.NewNopClient()
}
dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger)
clusterMetadata := s.cfg.ClusterMetadata
dc := dynamicconfig.NewCollection(
params.DynamicConfig,
params.Logger,
dynamicconfig.ClusterNameFilter(clusterMetadata.CurrentClusterName),
)

svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
Expand All @@ -135,7 +140,6 @@ func (s *server) startService() common.Daemon {

params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))

clusterMetadata := s.cfg.ClusterMetadata
params.ClusterMetadata = cluster.NewMetadata(
params.Logger,
dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, clusterMetadata.EnableGlobalDomain),
Expand Down
6 changes: 5 additions & 1 deletion common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ func New(
return nil, err
}

dynamicCollection := dynamicconfig.NewCollection(params.DynamicConfig, logger)
dynamicCollection := dynamicconfig.NewCollection(
params.DynamicConfig,
logger,
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
)
clientBean, err := client.NewClientBean(
client.NewRPCClientFactory(
params.RPCFactory,
Expand Down
149 changes: 122 additions & 27 deletions common/service/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,30 @@ const (
)

// NewCollection creates a new collection
func NewCollection(client Client, logger log.Logger) *Collection {
func NewCollection(
client Client,
logger log.Logger,
filterOptions ...FilterOption,
) *Collection {

return &Collection{
client: client,
logger: logger,
keys: &sync.Map{},
errCount: -1,
client: client,
logger: logger,
keys: &sync.Map{},
errCount: -1,
filterOptions: filterOptions,
}
}

// Collection wraps dynamic config client with a closure so that across the code, the config values
// can be directly accessed by calling the function without propagating the client everywhere in
// code
type Collection struct {
client Client
logger log.Logger
keys *sync.Map // map of config Key to strongly typed value
errCount int64
client Client
logger log.Logger
keys *sync.Map // map of config Key to strongly typed value
errCount int64
filterOptions []FilterOption
}

func (c *Collection) logError(key Key, err error) {
Expand Down Expand Up @@ -155,7 +162,12 @@ func getFilterMap(opts ...FilterOption) map[Filter]interface{} {
// GetIntProperty gets property and asserts that it's an integer
func (c *Collection) GetIntProperty(key Key, defaultValue int) IntPropertyFn {
return func(opts ...FilterOption) int {
val, err := c.client.GetIntValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetIntValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -166,8 +178,14 @@ func (c *Collection) GetIntProperty(key Key, defaultValue int) IntPropertyFn {

// GetIntPropertyFilteredByDomain gets property with domain filter and asserts that it's an integer
func (c *Collection) GetIntPropertyFilteredByDomain(key Key, defaultValue int) IntPropertyFnWithDomainFilter {

return func(domain string) int {
val, err := c.client.GetIntValue(key, getFilterMap(DomainFilter(domain)), defaultValue)
filters := append([]FilterOption{DomainFilter(domain)}, c.filterOptions...)
val, err := c.client.GetIntValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -179,9 +197,17 @@ func (c *Collection) GetIntPropertyFilteredByDomain(key Key, defaultValue int) I
// GetIntPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's an integer
func (c *Collection) GetIntPropertyFilteredByTaskListInfo(key Key, defaultValue int) IntPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) int {
filters := append(
[]FilterOption{
DomainFilter(domain),
TaskListFilter(taskList),
TaskTypeFilter(taskType),
},
c.filterOptions...,
)
val, err := c.client.GetIntValue(
key,
getFilterMap(DomainFilter(domain), TaskListFilter(taskList), TaskTypeFilter(taskType)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand All @@ -195,9 +221,10 @@ func (c *Collection) GetIntPropertyFilteredByTaskListInfo(key Key, defaultValue
// GetIntPropertyFilteredByShardID gets property with shardID as filter and asserts that it's an integer
func (c *Collection) GetIntPropertyFilteredByShardID(key Key, defaultValue int) IntPropertyFnWithShardIDFilter {
return func(shardID int) int {
filters := append([]FilterOption{ShardIDFilter(shardID)}, c.filterOptions...)
val, err := c.client.GetIntValue(
key,
getFilterMap(ShardIDFilter(shardID)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand All @@ -211,7 +238,12 @@ func (c *Collection) GetIntPropertyFilteredByShardID(key Key, defaultValue int)
// GetFloat64Property gets property and asserts that it's a float64
func (c *Collection) GetFloat64Property(key Key, defaultValue float64) FloatPropertyFn {
return func(opts ...FilterOption) float64 {
val, err := c.client.GetFloatValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetFloatValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -223,9 +255,10 @@ func (c *Collection) GetFloat64Property(key Key, defaultValue float64) FloatProp
// GetFloat64PropertyFilteredByShardID gets property with shardID filter and asserts that it's a float64
func (c *Collection) GetFloat64PropertyFilteredByShardID(key Key, defaultValue float64) FloatPropertyFnWithShardIDFilter {
return func(shardID int) float64 {
filters := append([]FilterOption{ShardIDFilter(shardID)}, c.filterOptions...)
val, err := c.client.GetFloatValue(
key,
getFilterMap(ShardIDFilter(shardID)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand All @@ -239,7 +272,12 @@ func (c *Collection) GetFloat64PropertyFilteredByShardID(key Key, defaultValue f
// GetDurationProperty gets property and asserts that it's a duration
func (c *Collection) GetDurationProperty(key Key, defaultValue time.Duration) DurationPropertyFn {
return func(opts ...FilterOption) time.Duration {
val, err := c.client.GetDurationValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetDurationValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -251,7 +289,12 @@ func (c *Collection) GetDurationProperty(key Key, defaultValue time.Duration) Du
// GetDurationPropertyFilteredByDomain gets property with domain filter and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByDomain(key Key, defaultValue time.Duration) DurationPropertyFnWithDomainFilter {
return func(domain string) time.Duration {
val, err := c.client.GetDurationValue(key, getFilterMap(DomainFilter(domain)), defaultValue)
filters := append([]FilterOption{DomainFilter(domain)}, c.filterOptions...)
val, err := c.client.GetDurationValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -263,7 +306,12 @@ func (c *Collection) GetDurationPropertyFilteredByDomain(key Key, defaultValue t
// GetDurationPropertyFilteredByDomainID gets property with domainID filter and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByDomainID(key Key, defaultValue time.Duration) DurationPropertyFnWithDomainIDFilter {
return func(domainID string) time.Duration {
val, err := c.client.GetDurationValue(key, getFilterMap(DomainIDFilter(domainID)), defaultValue)
filters := append([]FilterOption{DomainIDFilter(domainID)}, c.filterOptions...)
val, err := c.client.GetDurationValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -275,9 +323,17 @@ func (c *Collection) GetDurationPropertyFilteredByDomainID(key Key, defaultValue
// GetDurationPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByTaskListInfo(key Key, defaultValue time.Duration) DurationPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) time.Duration {
filters := append(
[]FilterOption{
DomainFilter(domain),
TaskListFilter(taskList),
TaskTypeFilter(taskType),
},
c.filterOptions...,
)
val, err := c.client.GetDurationValue(
key,
getFilterMap(DomainFilter(domain), TaskListFilter(taskList), TaskTypeFilter(taskType)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand All @@ -291,9 +347,10 @@ func (c *Collection) GetDurationPropertyFilteredByTaskListInfo(key Key, defaultV
// GetDurationPropertyFilteredByShardID gets property with shardID id as filter and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue time.Duration) DurationPropertyFnWithShardIDFilter {
return func(shardID int) time.Duration {
filters := append([]FilterOption{ShardIDFilter(shardID)}, c.filterOptions...)
val, err := c.client.GetDurationValue(
key,
getFilterMap(ShardIDFilter(shardID)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand All @@ -307,7 +364,12 @@ func (c *Collection) GetDurationPropertyFilteredByShardID(key Key, defaultValue
// GetBoolProperty gets property and asserts that it's an bool
func (c *Collection) GetBoolProperty(key Key, defaultValue bool) BoolPropertyFn {
return func(opts ...FilterOption) bool {
val, err := c.client.GetBoolValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetBoolValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -319,7 +381,12 @@ func (c *Collection) GetBoolProperty(key Key, defaultValue bool) BoolPropertyFn
// GetStringProperty gets property and asserts that it's an string
func (c *Collection) GetStringProperty(key Key, defaultValue string) StringPropertyFn {
return func(opts ...FilterOption) string {
val, err := c.client.GetStringValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetStringValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -331,7 +398,12 @@ func (c *Collection) GetStringProperty(key Key, defaultValue string) StringPrope
// GetMapProperty gets property and asserts that it's a map
func (c *Collection) GetMapProperty(key Key, defaultValue map[string]interface{}) MapPropertyFn {
return func(opts ...FilterOption) map[string]interface{} {
val, err := c.client.GetMapValue(key, getFilterMap(opts...), defaultValue)
opts = append(opts, c.filterOptions...)
val, err := c.client.GetMapValue(
key,
getFilterMap(opts...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -343,7 +415,12 @@ func (c *Collection) GetMapProperty(key Key, defaultValue map[string]interface{}
// GetStringPropertyFilteredByDomain gets property with domain filter and asserts that it's a string
func (c *Collection) GetStringPropertyFilteredByDomain(key Key, defaultValue string) StringPropertyFnWithDomainFilter {
return func(domain string) string {
val, err := c.client.GetStringValue(key, getFilterMap(DomainFilter(domain)), defaultValue)
filters := append([]FilterOption{DomainFilter(domain)}, c.filterOptions...)
val, err := c.client.GetStringValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -355,7 +432,12 @@ func (c *Collection) GetStringPropertyFilteredByDomain(key Key, defaultValue str
// GetBoolPropertyFilteredByDomain gets property with domain filter and asserts that it's a bool
func (c *Collection) GetBoolPropertyFilteredByDomain(key Key, defaultValue bool) BoolPropertyFnWithDomainFilter {
return func(domain string) bool {
val, err := c.client.GetBoolValue(key, getFilterMap(DomainFilter(domain)), defaultValue)
filters := append([]FilterOption{DomainFilter(domain)}, c.filterOptions...)
val, err := c.client.GetBoolValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -367,7 +449,12 @@ func (c *Collection) GetBoolPropertyFilteredByDomain(key Key, defaultValue bool)
// GetBoolPropertyFilteredByDomainID gets property with domainID filter and asserts that it's a bool
func (c *Collection) GetBoolPropertyFilteredByDomainID(key Key, defaultValue bool) BoolPropertyFnWithDomainIDFilter {
return func(domainID string) bool {
val, err := c.client.GetBoolValue(key, getFilterMap(DomainIDFilter(domainID)), defaultValue)
filters := append([]FilterOption{DomainIDFilter(domainID)}, c.filterOptions...)
val, err := c.client.GetBoolValue(
key,
getFilterMap(filters...),
defaultValue,
)
if err != nil {
c.logError(key, err)
}
Expand All @@ -379,9 +466,17 @@ func (c *Collection) GetBoolPropertyFilteredByDomainID(key Key, defaultValue boo
// GetBoolPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's an bool
func (c *Collection) GetBoolPropertyFilteredByTaskListInfo(key Key, defaultValue bool) BoolPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) bool {
filters := append(
[]FilterOption{
DomainFilter(domain),
TaskListFilter(taskList),
TaskTypeFilter(taskType),
},
c.filterOptions...,
)
val, err := c.client.GetBoolValue(
key,
getFilterMap(DomainFilter(domain), TaskListFilter(taskList), TaskTypeFilter(taskType)),
getFilterMap(filters...),
defaultValue,
)
if err != nil {
Expand Down
31 changes: 30 additions & 1 deletion common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,19 +910,39 @@ const (
type Filter int

func (f Filter) String() string {
if f <= unknownFilter || f > ShardID {
if f <= unknownFilter || f > ClusterName {
return filters[unknownFilter]
}
return filters[f]
}

func parseFilter(filterName string) Filter {
switch filterName {
case "domainName":
return DomainName
case "domainID":
return DomainID
case "taskListName":
return TaskListName
case "taskType":
return TaskType
case "shardID":
return ShardID
case "clusterName":
return ClusterName
default:
return unknownFilter
}
}

var filters = []string{
"unknownFilter",
"domainName",
"domainID",
"taskListName",
"taskType",
"shardID",
"clusterName",
}

const (
Expand All @@ -937,6 +957,8 @@ const (
TaskType
// ShardID is the shard id
ShardID
// ClusterName is the cluster name in a multi-region setup
ClusterName

// lastFilterTypeForTest must be the last one in this const group for testing purpose
lastFilterTypeForTest
Expand Down Expand Up @@ -979,3 +1001,10 @@ func ShardIDFilter(shardID int) FilterOption {
filterMap[ShardID] = shardID
}
}

// ClusterNameFilter filters by cluster name
func ClusterNameFilter(clusterName string) FilterOption {
return func(filterMap map[Filter]interface{}) {
filterMap[ClusterName] = clusterName
}
}
Loading

0 comments on commit e3f0b40

Please sign in to comment.