Skip to content

Commit

Permalink
Update worker replication config (cadence-workflow#3373)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 4, 2021
1 parent fd24bba commit 67900e4
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
6 changes: 3 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ var keys = map[Key]string{
WorkerReplicationTaskContextDuration: "worker.replicationTaskContextDuration",
WorkerReReplicationContextTimeout: "worker.workerReReplicationContextTimeout",
WorkerEnableRPCReplication: "worker.enableWorkerRPCReplication",
WorkerEnableHistoryReplication: "worker.enableWorkerHistoryReplication",
WorkerEnableKafkaReplication: "worker.enableKafkaReplication",
WorkerIndexerConcurrency: "worker.indexerConcurrency",
WorkerESProcessorNumOfWorkers: "worker.ESProcessorNumOfWorkers",
WorkerESProcessorBulkActions: "worker.ESProcessorBulkActions",
Expand Down Expand Up @@ -760,8 +760,8 @@ const (
WorkerReReplicationContextTimeout
// WorkerEnableRPCReplication is the feature flag for RPC replication
WorkerEnableRPCReplication
// WorkerEnableHistoryReplication is the feature flag for history replication
WorkerEnableHistoryReplication
// WorkerEnableKafkaReplication is the feature flag for kafka replication
WorkerEnableKafkaReplication
// WorkerIndexerConcurrency is the max concurrent messages to be processed at any given time
WorkerIndexerConcurrency
// WorkerESProcessorNumOfWorkers is num of workers for esProcessor
Expand Down
16 changes: 8 additions & 8 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,32 @@ func TestConvertDynamicConfigMapPropertyToIntMap(t *testing.T) {
}

func TestCreateHistoryStartWorkflowRequest_ExpirationTimeWithCron(t *testing.T) {
domainId := uuid.New()
domainID := uuid.New()
request := &workflow.StartWorkflowExecutionRequest{
RetryPolicy: &workflow.RetryPolicy{
RetryPolicy: &workflow.RetryPolicy{
InitialIntervalInSeconds: Int32Ptr(60),
ExpirationIntervalInSeconds: Int32Ptr(60),
},
CronSchedule: StringPtr("@every 300s"),
CronSchedule: StringPtr("@every 300s"),
}
now := time.Now()
startRequest := CreateHistoryStartWorkflowRequest(domainId, request)
startRequest := CreateHistoryStartWorkflowRequest(domainID, request)

expirationTime := startRequest.GetExpirationTimestamp()
require.NotNil(t, expirationTime)
require.True(t, time.Unix(0, expirationTime).Sub(now) > 60 * time.Second)
require.True(t, time.Unix(0, expirationTime).Sub(now) > 60*time.Second)
}

func TestCreateHistoryStartWorkflowRequest_ExpirationTimeWithoutCron(t *testing.T) {
domainId := uuid.New()
domainID := uuid.New()
request := &workflow.StartWorkflowExecutionRequest{
RetryPolicy: &workflow.RetryPolicy{
RetryPolicy: &workflow.RetryPolicy{
InitialIntervalInSeconds: Int32Ptr(60),
ExpirationIntervalInSeconds: Int32Ptr(60),
},
}
now := time.Now()
startRequest := CreateHistoryStartWorkflowRequest(domainId, request)
startRequest := CreateHistoryStartWorkflowRequest(domainID, request)

expirationTime := startRequest.GetExpirationTimestamp()
require.NotNil(t, expirationTime)
Expand Down
10 changes: 4 additions & 6 deletions service/worker/replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type (
ReplicationTaskContextTimeout dynamicconfig.DurationPropertyFn
ReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableHistoryReplication dynamicconfig.BoolPropertyFn
EnableKafkaReplication dynamicconfig.BoolPropertyFn
}
)

Expand Down Expand Up @@ -143,11 +143,9 @@ func (r *Replicator) Start() error {
}
}

if r.config.EnableHistoryReplication() {
for _, processor := range r.processors {
if err := processor.Start(); err != nil {
return err
}
for _, processor := range r.processors {
if err := processor.Start(); err != nil {
return err
}
}
for _, domainProcessor := range r.domainProcessors {
Expand Down
4 changes: 2 additions & 2 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func NewConfig(params *service.BootstrapParams) *Config {
ReplicationTaskContextTimeout: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskContextDuration, 30*time.Second),
ReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.WorkerReReplicationContextTimeout, 0*time.Second),
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableRPCReplication, false),
EnableHistoryReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableRPCReplication, true),
EnableKafkaReplication: dc.GetBoolProperty(dynamicconfig.WorkerEnableKafkaReplication, false),
},
ArchiverConfig: &archiver.Config{
ArchiverConcurrency: dc.GetIntProperty(dynamicconfig.WorkerArchiverConcurrency, 50),
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s *Service) Start() {
s.startIndexer()
}

if s.GetClusterMetadata().IsGlobalDomainEnabled() {
if s.GetClusterMetadata().IsGlobalDomainEnabled() && s.config.ReplicationCfg.EnableKafkaReplication() {
s.startReplicator()
}
if s.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() {
Expand Down

0 comments on commit 67900e4

Please sign in to comment.