Skip to content

Commit

Permalink
separate the transmission of domain replication in to frontend package
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Mar 2, 2018
1 parent 69c846f commit 1095015
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 188 deletions.
122 changes: 122 additions & 0 deletions service/frontend/domainReplicationTaskHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package frontend

import (
"errors"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/persistence"
)

var (
// ErrInvalidDomainStatus is the error to indicate invalid domain status
ErrInvalidDomainStatus = errors.New("invalid domain status attribute")
)

// NOTE: the conterpart of domain replication receiving logic is in service/worker package

type (
// DomainReplicator is the interface which can replicate the domain
DomainReplicator interface {
HandleTransmissionTask(domainOperation replicator.DomainOperation, info *persistence.DomainInfo,
config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig,
configVersion int64, failoverVersion int64) error
}

domainReplicatorImpl struct {
kafka messaging.Producer
logger bark.Logger
}
)

// NewDomainReplicator create a new instance odf domain replicator
func NewDomainReplicator(kafka messaging.Producer, logger bark.Logger) DomainReplicator {
return &domainReplicatorImpl{
kafka: kafka,
logger: logger,
}
}

// HandleTransmissionTask handle transmission of the domain replication task
func (domainReplicator *domainReplicatorImpl) HandleTransmissionTask(domainOperation replicator.DomainOperation,
info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig,
configVersion int64, failoverVersion int64) error {
status, err := domainReplicator.convertDomainStatusToThrift(info.Status)
if err != nil {
return err
}

taskType := replicator.ReplicationTaskTypeDomain
task := &replicator.DomainTaskAttributes{
DomainOperation: &domainOperation,
ID: common.StringPtr(info.ID),
Info: &shared.DomainInfo{
Name: common.StringPtr(info.Name),
Status: status,
Description: common.StringPtr(info.Description),
OwnerEmail: common.StringPtr(info.OwnerEmail),
},
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(config.Retention),
EmitMetric: common.BoolPtr(config.EmitMetric),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(replicationConfig.ActiveClusterName),
Clusters: domainReplicator.convertClusterReplicationConfigToThrift(replicationConfig.Clusters),
},
ConfigVersion: common.Int64Ptr(configVersion),
FailoverVersion: common.Int64Ptr(failoverVersion),
}

return domainReplicator.kafka.Publish(&replicator.ReplicationTask{
TaskType: &taskType,
DomainTaskAttributes: task,
})
}

func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift(
input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration {
output := []*shared.ClusterReplicationConfiguration{}
for _, cluster := range input {
clusterName := common.StringPtr(cluster.ClusterName)
output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName})
}
return output
}

func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) {
switch input {
case persistence.DomainStatusRegistered:
output := shared.DomainStatusRegistered
return &output, nil
case persistence.DomainStatusDeprecated:
output := shared.DomainStatusDeprecated
return &output, nil
default:
return nil, ErrInvalidDomainStatus
}

}
206 changes: 206 additions & 0 deletions service/frontend/domainReplicationTaskHandler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package frontend

import (
"log"
"os"
"testing"

"github.com/pborman/uuid"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
)

type (
domainReplicatorSuite struct {
suite.Suite
domainReplicator *domainReplicatorImpl
kafkaProducer *mocks.KafkaProducer
}
)

func TestDomainReplicatorSuite(t *testing.T) {
s := new(domainReplicatorSuite)
suite.Run(t, s)
}

func (s *domainReplicatorSuite) SetupSuite() {
if testing.Verbose() {
log.SetOutput(os.Stdout)
}
}

func (s *domainReplicatorSuite) TearDownSuite() {

}

func (s *domainReplicatorSuite) SetupTest() {
s.kafkaProducer = &mocks.KafkaProducer{}
s.domainReplicator = NewDomainReplicator(
s.kafkaProducer,
bark.NewLoggerFromLogrus(logrus.New()),
).(*domainReplicatorImpl)

}

func (s *domainReplicatorSuite) TearDownTest() {
}

func (s *domainReplicatorSuite) TestHandleTransmissionTask_RegisterDomainTask() {
taskType := replicator.ReplicationTaskTypeDomain
id := uuid.New()
name := "some random domain test name"
status := shared.DomainStatusRegistered
description := "some random test description"
ownerEmail := "some random test owner"
retention := int32(10)
emitMetric := true
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
failoverVersion := int64(59)
clusters := []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{
ClusterName: clusterActive,
},
&persistence.ClusterReplicationConfig{
ClusterName: clusterStandby,
},
}

domainOperation := replicator.DomainOperationCreate
info := &persistence.DomainInfo{
ID: id,
Name: name,
Status: persistence.DomainStatusRegistered,
Description: description,
OwnerEmail: ownerEmail,
}
config := &persistence.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
}
replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: clusters,
}

s.kafkaProducer.On("Publish", &replicator.ReplicationTask{
TaskType: &taskType,
DomainTaskAttributes: &replicator.DomainTaskAttributes{
DomainOperation: &domainOperation,
ID: common.StringPtr(id),
Info: &shared.DomainInfo{
Name: common.StringPtr(name),
Status: &status,
Description: common.StringPtr(description),
OwnerEmail: common.StringPtr(ownerEmail),
},
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters),
},
ConfigVersion: common.Int64Ptr(configVersion),
FailoverVersion: common.Int64Ptr(failoverVersion),
},
}).Return(nil).Once()

err := s.domainReplicator.HandleTransmissionTask(domainOperation, info, config, replicationConfig, configVersion, failoverVersion)
s.Nil(err)
}

func (s *domainReplicatorSuite) TestHandleTransmissionTask_UpdateDomainTask() {
taskType := replicator.ReplicationTaskTypeDomain
id := uuid.New()
name := "some random domain test name"
status := shared.DomainStatusDeprecated
description := "some random test description"
ownerEmail := "some random test owner"
retention := int32(10)
emitMetric := true
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(0)
failoverVersion := int64(59)
clusters := []*persistence.ClusterReplicationConfig{
&persistence.ClusterReplicationConfig{
ClusterName: clusterActive,
},
&persistence.ClusterReplicationConfig{
ClusterName: clusterStandby,
},
}

domainOperation := replicator.DomainOperationUpdate
info := &persistence.DomainInfo{
ID: id,
Name: name,
Status: persistence.DomainStatusDeprecated,
Description: description,
OwnerEmail: ownerEmail,
}
config := &persistence.DomainConfig{
Retention: retention,
EmitMetric: emitMetric,
}
replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: clusterActive,
Clusters: clusters,
}

s.kafkaProducer.On("Publish", &replicator.ReplicationTask{
TaskType: &taskType,
DomainTaskAttributes: &replicator.DomainTaskAttributes{
DomainOperation: &domainOperation,
ID: common.StringPtr(id),
Info: &shared.DomainInfo{
Name: common.StringPtr(name),
Status: &status,
Description: common.StringPtr(description),
OwnerEmail: common.StringPtr(ownerEmail),
},
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Clusters: s.domainReplicator.convertClusterReplicationConfigToThrift(clusters),
},
ConfigVersion: common.Int64Ptr(configVersion),
FailoverVersion: common.Int64Ptr(failoverVersion),
},
}).Return(nil).Once()

err := s.domainReplicator.HandleTransmissionTask(domainOperation, info, config, replicationConfig, configVersion, failoverVersion)
s.Nil(err)
}
11 changes: 4 additions & 7 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/service/worker"
"go.uber.org/yarpc/yarpcerrors"
)

Expand All @@ -69,8 +68,7 @@ type (
startWG sync.WaitGroup
rateLimiter common.TokenBucket
config *Config
kafkaProducer messaging.Producer
domainReplicator worker.DomainReplicator
domainReplicator DomainReplicator
service.Service
}

Expand Down Expand Up @@ -124,8 +122,7 @@ func NewWorkflowHandler(
hSerializerFactory: persistence.NewHistorySerializerFactory(),
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()),
rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()),
kafkaProducer: kafkaProducer,
domainReplicator: worker.NewDomainReplicator(metadataMgr, sVice.GetLogger()),
domainReplicator: NewDomainReplicator(kafkaProducer, sVice.GetLogger()),
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down Expand Up @@ -250,7 +247,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *

// TODO remove the IsGlobalDomainEnabled check once cross DC is public
if clusterMetadata.IsGlobalDomainEnabled() {
err = wh.domainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationCreate,
err = wh.domainReplicator.HandleTransmissionTask(replicator.DomainOperationCreate,
domainRequest.Info, domainRequest.Config, domainRequest.ReplicationConfig, 0, domainRequest.FailoverVersion)
if err != nil {
return wh.error(err, scope)
Expand Down Expand Up @@ -445,7 +442,7 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context,
}
// TODO remove the IsGlobalDomainEnabled check once cross DC is public
if clusterMetadata.IsGlobalDomainEnabled() {
err = wh.domainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationUpdate,
err = wh.domainReplicator.HandleTransmissionTask(replicator.DomainOperationUpdate,
info, config, replicationConfig, configVersion, failoverVersion)
if err != nil {
return nil, wh.error(err, scope)
Expand Down
Loading

0 comments on commit 1095015

Please sign in to comment.