Skip to content

Commit

Permalink
wire replicator transmission to domain APIs (#590)
Browse files Browse the repository at this point in the history
* wire replicator transmission to domain APIs
  • Loading branch information
wxing1292 authored Mar 2, 2018
1 parent 7d658c9 commit 5c6cb08
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func startHandler(c *cli.Context) {
log.Fatal("Unable to get current directory")
}
if err := cassandra.VerifyCompatibleVersion(cassCfg, dir); err != nil {
log.Fatalf("Incompatible versions", err)
log.Fatal("Incompatible versions", err)
}

services := getServices(c)
Expand Down
76 changes: 76 additions & 0 deletions common/mocks/KafkaProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mocks

import (
mock "github.com/stretchr/testify/mock"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/common/messaging"
)

// KafkaProducer is an autogenerated mock type for the KafkaProducer type
type KafkaProducer struct {
mock.Mock
}

// Close provides a mock function with given fields:
func (_m *KafkaProducer) Close() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}

return r0
}

// Publish provides a mock function with given fields: msg
func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error {
ret := _m.Called(msg)

var r0 error
if rf, ok := ret.Get(0).(func(*replicator.ReplicationTask) error); ok {
r0 = rf(msg)
} else {
r0 = ret.Error(0)
}

return r0
}

// PublishBatch provides a mock function with given fields: msgs
func (_m *KafkaProducer) PublishBatch(msgs []*replicator.ReplicationTask) error {
ret := _m.Called(msgs)

var r0 error
if rf, ok := ret.Get(0).(func([]*replicator.ReplicationTask) error); ok {
r0 = rf(msgs)
} else {
r0 = ret.Error(0)
}

return r0
}

var _ messaging.Producer = (*KafkaProducer)(nil)
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *History
}

func (b *HistoryEventBatch) String() string {
return fmt.Sprint("[version:%v, events:%v]", b.Version, b.Events)
return fmt.Sprintf("[version:%v, events:%v]", b.Version, b.Events)
}

// NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch
Expand Down
7 changes: 7 additions & 0 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type (
runtimeMetricsReporter *metrics.RuntimeMetricsReporter
metricsClient metrics.Client
clusterMetadata cluster.Metadata
messagingClient messaging.Client
dynamicCollection *dynamicconfig.Collection
}
)
Expand All @@ -105,6 +106,7 @@ func New(params *BootstrapParams) Service {
metricsScope: params.MetricScope,
numberOfHistoryShards: params.CassandraConfig.NumHistoryShards,
clusterMetadata: params.ClusterMetadata,
messagingClient: params.MessagingClient,
dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig),
}
sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger)
Expand Down Expand Up @@ -227,6 +229,11 @@ func (h *serviceImpl) GetClusterMetadata() cluster.Metadata {
return h.clusterMetadata
}

// GetMessagingClient returns the messaging client against Kafka
func (h *serviceImpl) GetMessagingClient() messaging.Client {
return h.messagingClient
}

func getMetricsServiceIdx(serviceName string, logger bark.Logger) metrics.ServiceIdx {
switch serviceName {
case common.FrontendServiceName:
Expand Down
4 changes: 4 additions & 0 deletions common/service/serviceinterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/uber/cadence/client"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"go.uber.org/yarpc"
)
Expand Down Expand Up @@ -55,5 +56,8 @@ type (

// GetClusterMetadata returns the service cluster metadata
GetClusterMetadata() cluster.Metadata

// GetMessagingClient returns the messaging client against Kafka
GetMessagingClient() messaging.Client
}
)
8 changes: 7 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (

"errors"

"github.com/stretchr/testify/mock"
"github.com/uber-common/bark"
"github.com/uber-go/tally"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
fecli "github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -197,9 +199,13 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
params.CassandraConfig.Hosts = "127.0.0.1"

// TODO when cross DC is public, remove this temporary override
kafkaProducer := &mocks.KafkaProducer{}
kafkaProducer.On("Publish", mock.Anything).Return(nil)

c.frontEndService = service.New(params)
c.frontendHandler = frontend.NewWorkflowHandler(
c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr)
c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr, kafkaProducer)
err := c.frontendHandler.Start()
if err != nil {
c.logger.WithField("error", err).Fatal("Failed to start frontend")
Expand Down
121 changes: 121 additions & 0 deletions service/frontend/domainReplicationTaskHandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package frontend

import (
"errors"

"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/persistence"
)

var (
// ErrInvalidDomainStatus is the error to indicate invalid domain status
ErrInvalidDomainStatus = errors.New("invalid domain status attribute")
)

// NOTE: the counterpart of domain replication receiving logic is in service/worker package

type (
// DomainReplicator is the interface which can replicate the domain
DomainReplicator interface {
HandleTransmissionTask(domainOperation replicator.DomainOperation, info *persistence.DomainInfo,
config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig,
configVersion int64, failoverVersion int64) error
}

domainReplicatorImpl struct {
kafka messaging.Producer
logger bark.Logger
}
)

// NewDomainReplicator create a new instance odf domain replicator
func NewDomainReplicator(kafka messaging.Producer, logger bark.Logger) DomainReplicator {
return &domainReplicatorImpl{
kafka: kafka,
logger: logger,
}
}

// HandleTransmissionTask handle transmission of the domain replication task
func (domainReplicator *domainReplicatorImpl) HandleTransmissionTask(domainOperation replicator.DomainOperation,
info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig,
configVersion int64, failoverVersion int64) error {
status, err := domainReplicator.convertDomainStatusToThrift(info.Status)
if err != nil {
return err
}

taskType := replicator.ReplicationTaskTypeDomain
task := &replicator.DomainTaskAttributes{
DomainOperation: &domainOperation,
ID: common.StringPtr(info.ID),
Info: &shared.DomainInfo{
Name: common.StringPtr(info.Name),
Status: status,
Description: common.StringPtr(info.Description),
OwnerEmail: common.StringPtr(info.OwnerEmail),
},
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(config.Retention),
EmitMetric: common.BoolPtr(config.EmitMetric),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(replicationConfig.ActiveClusterName),
Clusters: domainReplicator.convertClusterReplicationConfigToThrift(replicationConfig.Clusters),
},
ConfigVersion: common.Int64Ptr(configVersion),
FailoverVersion: common.Int64Ptr(failoverVersion),
}

return domainReplicator.kafka.Publish(&replicator.ReplicationTask{
TaskType: &taskType,
DomainTaskAttributes: task,
})
}

func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift(
input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration {
output := []*shared.ClusterReplicationConfiguration{}
for _, cluster := range input {
clusterName := common.StringPtr(cluster.ClusterName)
output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName})
}
return output
}

func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) {
switch input {
case persistence.DomainStatusRegistered:
output := shared.DomainStatusRegistered
return &output, nil
case persistence.DomainStatusDeprecated:
output := shared.DomainStatusDeprecated
return &output, nil
default:
return nil, ErrInvalidDomainStatus
}
}
Loading

0 comments on commit 5c6cb08

Please sign in to comment.