Skip to content

Commit

Permalink
wire replicator transmission to domain APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Mar 2, 2018
1 parent 7d658c9 commit 612e1a6
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/server/cadence.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func startHandler(c *cli.Context) {
log.Fatal("Unable to get current directory")
}
if err := cassandra.VerifyCompatibleVersion(cassCfg, dir); err != nil {
log.Fatalf("Incompatible versions", err)
log.Fatal("Incompatible versions", err)
}

services := getServices(c)
Expand Down
76 changes: 76 additions & 0 deletions common/mocks/KafkaProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package mocks

import (
mock "github.com/stretchr/testify/mock"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/common/messaging"
)

// KafkaProducer is an autogenerated mock type for the KafkaProducer type
type KafkaProducer struct {
mock.Mock
}

// Close provides a mock function with given fields:
func (_m *KafkaProducer) Close() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}

return r0
}

// Publish provides a mock function with given fields: msg
func (_m *KafkaProducer) Publish(msg *replicator.ReplicationTask) error {
ret := _m.Called(msg)

var r0 error
if rf, ok := ret.Get(0).(func(*replicator.ReplicationTask) error); ok {
r0 = rf(msg)
} else {
r0 = ret.Error(0)
}

return r0
}

// PublishBatch provides a mock function with given fields: msgs
func (_m *KafkaProducer) PublishBatch(msgs []*replicator.ReplicationTask) error {
ret := _m.Called(msgs)

var r0 error
if rf, ok := ret.Get(0).(func([]*replicator.ReplicationTask) error); ok {
r0 = rf(msgs)
} else {
r0 = ret.Error(0)
}

return r0
}

var _ messaging.Producer = (*KafkaProducer)(nil)
2 changes: 1 addition & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *History
}

func (b *HistoryEventBatch) String() string {
return fmt.Sprint("[version:%v, events:%v]", b.Version, b.Events)
return fmt.Sprintf("[version:%v, events:%v]", b.Version, b.Events)
}

// NewSerializedHistoryEventBatch constructs and returns a new instance of of SerializedHistoryEventBatch
Expand Down
7 changes: 7 additions & 0 deletions common/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type (
runtimeMetricsReporter *metrics.RuntimeMetricsReporter
metricsClient metrics.Client
clusterMetadata cluster.Metadata
messagingClient messaging.Client
dynamicCollection *dynamicconfig.Collection
}
)
Expand All @@ -105,6 +106,7 @@ func New(params *BootstrapParams) Service {
metricsScope: params.MetricScope,
numberOfHistoryShards: params.CassandraConfig.NumHistoryShards,
clusterMetadata: params.ClusterMetadata,
messagingClient: params.MessagingClient,
dynamicCollection: dynamicconfig.NewCollection(params.DynamicConfig),
}
sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(params.MetricScope, time.Minute, sVice.logger)
Expand Down Expand Up @@ -227,6 +229,11 @@ func (h *serviceImpl) GetClusterMetadata() cluster.Metadata {
return h.clusterMetadata
}

// GetMessagingClient returns the messaging client against Kafka
func (h *serviceImpl) GetMessagingClient() messaging.Client {
return h.messagingClient
}

func getMetricsServiceIdx(serviceName string, logger bark.Logger) metrics.ServiceIdx {
switch serviceName {
case common.FrontendServiceName:
Expand Down
4 changes: 4 additions & 0 deletions common/service/serviceinterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/uber/cadence/client"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"go.uber.org/yarpc"
)
Expand Down Expand Up @@ -55,5 +56,8 @@ type (

// GetClusterMetadata returns the service cluster metadata
GetClusterMetadata() cluster.Metadata

// GetMessagingClient returns the messaging client against Kafka
GetMessagingClient() messaging.Client
}
)
8 changes: 7 additions & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (

"errors"

"github.com/stretchr/testify/mock"
"github.com/uber-common/bark"
"github.com/uber-go/tally"
"github.com/uber/cadence/.gen/go/cadence/workflowserviceclient"
fecli "github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -197,9 +199,13 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
params.CassandraConfig.Hosts = "127.0.0.1"

// TODO when cross DC is public, remove this temporary override
kafkaProducer := &mocks.KafkaProducer{}
kafkaProducer.On("Publish", mock.Anything).Return(nil)

c.frontEndService = service.New(params)
c.frontendHandler = frontend.NewWorkflowHandler(
c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr)
c.frontEndService, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr, kafkaProducer)
err := c.frontendHandler.Start()
if err != nil {
c.logger.WithField("error", err).Fatal("Failed to start frontend")
Expand Down
36 changes: 32 additions & 4 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"sync"
"time"

"github.com/uber/cadence/common/messaging"

"github.com/pborman/uuid"
"github.com/uber-common/bark"
"github.com/uber-go/tally"
Expand All @@ -35,6 +37,7 @@ import (
"github.com/uber/cadence/.gen/go/health/metaserver"
h "github.com/uber/cadence/.gen/go/history"
m "github.com/uber/cadence/.gen/go/matching"
"github.com/uber/cadence/.gen/go/replicator"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/client/matching"
Expand All @@ -45,6 +48,7 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/service/worker"
"go.uber.org/yarpc/yarpcerrors"
)

Expand All @@ -65,6 +69,8 @@ type (
startWG sync.WaitGroup
rateLimiter common.TokenBucket
config *Config
kafkaProducer messaging.Producer
DomainReplicator worker.DomainReplicator
service.Service
}

Expand Down Expand Up @@ -106,7 +112,8 @@ var (
// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(
sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager) *WorkflowHandler {
historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager,
kafkaProducer messaging.Producer) *WorkflowHandler {
handler := &WorkflowHandler{
Service: sVice,
config: config,
Expand All @@ -117,6 +124,8 @@ func NewWorkflowHandler(
hSerializerFactory: persistence.NewHistorySerializerFactory(),
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()),
rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()),
kafkaProducer: kafkaProducer,
DomainReplicator: worker.NewDomainReplicator(metadataMgr, sVice.GetLogger()),
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down Expand Up @@ -214,7 +223,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *
return wh.error(err, scope)
}

response, err := wh.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{
domainRequest := &persistence.CreateDomainRequest{
Info: &persistence.DomainInfo{
ID: uuid.New(),
Name: registerRequest.GetName(),
Expand All @@ -232,14 +241,25 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *
},
IsGlobalDomain: clusterMetadata.IsGlobalDomainEnabled(),
FailoverVersion: 0, // TODO do something?
})
}

domainResponse, err := wh.metadataMgr.CreateDomain(domainRequest)
if err != nil {
return wh.error(err, scope)
}

// TODO remove the IsGlobalDomainEnabled check once cross DC is public
if clusterMetadata.IsGlobalDomainEnabled() {
err = wh.DomainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationCreate,
domainRequest.Info, domainRequest.Config, domainRequest.ReplicationConfig, 0, domainRequest.FailoverVersion)
if err != nil {
return wh.error(err, scope)
}
}

// TODO: Log through logging framework. We need to have good auditing of domain CRUD
wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", *registerRequest.Name, response.ID)
wh.GetLogger().Debugf("Register domain succeeded for name: %v, Id: %v", registerRequest.GetName(), domainResponse.ID)

return nil
}

Expand Down Expand Up @@ -423,6 +443,14 @@ func (wh *WorkflowHandler) UpdateDomain(ctx context.Context,
if err != nil {
return nil, wh.error(err, scope)
}
// TODO remove the IsGlobalDomainEnabled check once cross DC is public
if clusterMetadata.IsGlobalDomainEnabled() {
err = wh.DomainReplicator.HandleTransmissionTask(wh.kafkaProducer, replicator.DomainOperationUpdate,
info, config, replicationConfig, configVersion, failoverVersion)
if err != nil {
return nil, wh.error(err, scope)
}
}
} else if clusterMetadata.IsGlobalDomainEnabled() && !clusterMetadata.IsMasterCluster() {
// although there is no attr updated, just prevent customer to use the non master cluster
// for update domain, ever (except if customer want to do a domain failover)
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ func (s *Service) Start() {

history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient())

handler := NewWorkflowHandler(base, s.config, metadata, history, visibility)
kafkaProducer, err := base.GetMessagingClient().NewProducer(base.GetClusterMetadata().GetCurrentClusterName())
if err != nil {
log.Fatalf("Creating kafka producer failed: %v", err)
}

handler := NewWorkflowHandler(base, s.config, metadata, history, visibility, kafkaProducer)
handler.Start()

log.Infof("%v started", common.FrontendServiceName)
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2121,7 +2121,7 @@ func getScheduleID(activityID string, msBuilder *mutableStateBuilder) (int64, er
}
scheduleID, ok := msBuilder.GetScheduleIDByActivityID(activityID)
if !ok {
return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %d\n", activityID)}
return 0, &workflow.BadRequestError{Message: fmt.Sprintf("No such activityID: %s\n", activityID)}
}
return scheduleID, nil
}
6 changes: 3 additions & 3 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1325,7 +1325,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfNoAidFound() {
Identity: &identity,
},
})
s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}")
s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}")
}

func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() {
Expand Down Expand Up @@ -1836,7 +1836,7 @@ func (s *engineSuite) TestRespondActivityTaskFailededIfNoAIdFound() {
Identity: &identity,
},
})
s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}")
s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}")
}

func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() {
Expand Down Expand Up @@ -2548,7 +2548,7 @@ func (s *engineSuite) TestRespondActivityTaskCanceledIfNoAidFound() {
Identity: &identity,
},
})
s.EqualError(err, "BadRequestError{Message: No such activityID: %!d(string=aid)\n}")
s.EqualError(err, "BadRequestError{Message: No such activityID: aid\n}")
}

func (s *engineSuite) TestRequestCancel_RespondDecisionTaskCompleted_NotScheduled() {
Expand Down
Loading

0 comments on commit 612e1a6

Please sign in to comment.