From 24aa94dad5e6e23476418cde25c7c31b0f30db75 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Mon, 20 Apr 2020 12:17:29 -0700 Subject: [PATCH] Move replication related code to subfolder under history (#3204) --- common/client/versionChecker.go | 4 +- common/constants.go | 2 + service/history/handler.go | 5 +- service/history/historyEngine.go | 22 ++-- .../dlq_handler.go} | 49 +++---- .../history/replication/dlq_handler_mock.go | 105 +++++++++++++++ .../dlq_handler_test.go} | 56 ++++---- .../task_executor.go} | 33 ++--- .../task_executor_mock.go} | 34 ++--- .../task_executor_test.go} | 89 +++++++------ .../task_fetcher.go} | 82 ++++++------ .../task_fetcher_mock.go} | 94 +++++++------- .../task_fetcher_test.go} | 36 +++--- .../task_processor.go} | 121 +++++++++--------- .../task_processor_mock.go} | 40 +++--- .../task_processor_test.go} | 82 ++++++------ service/history/replicationDLQHandler_mock.go | 105 --------------- service/history/replicatorQueueProcessor.go | 4 +- 18 files changed, 490 insertions(+), 473 deletions(-) rename service/history/{replicationDLQHandler.go => replication/dlq_handler.go} (83%) create mode 100644 service/history/replication/dlq_handler_mock.go rename service/history/{replicationDLQHandler_test.go => replication/dlq_handler_test.go} (77%) rename service/history/{replicationTaskExecutor.go => replication/task_executor.go} (92%) rename service/history/{replicationTaskExecutor_mock.go => replication/task_executor_mock.go} (60%) rename service/history/{replicationTaskExecutor_test.go => replication/task_executor_test.go} (71%) rename service/history/{replicationTaskFetcher.go => replication/task_fetcher.go} (81%) rename service/history/{replicationTaskFetcher_mock.go => replication/task_fetcher_mock.go} (52%) rename service/history/{replicationTaskFetcher_test.go => replication/task_fetcher_test.go} (81%) rename service/history/{replicationTaskProcessor.go => replication/task_processor.go} (82%) rename service/history/{replicationTaskProcessor_mock.go => replication/task_processor_mock.go} (60%) rename service/history/{replicationTaskProcessor_test.go => replication/task_processor_test.go} (73%) delete mode 100644 service/history/replicationDLQHandler_mock.go diff --git a/common/client/versionChecker.go b/common/client/versionChecker.go index 527957584ca..6e404568004 100644 --- a/common/client/versionChecker.go +++ b/common/client/versionChecker.go @@ -54,9 +54,9 @@ const ( JavaWorkerStickyQueryVersion = "1.0.0" // GoWorkerConsistentQueryVersion indicates the minimum client version of the go worker which supports ConsistentQuery GoWorkerConsistentQueryVersion = "1.5.0" - // JavaWorkerSendRawWorkflowHistoryVersion indicates the minimum client version of the java worker which supports RawHistoryQuery + // JavaWorkerRawHistoryQueryVersion indicates the minimum client version of the java worker which supports RawHistoryQuery JavaWorkerRawHistoryQueryVersion = "1.3.0" - // GoWorkerSendRawWorkflowHistoryVersion indicates the minimum client version of the go worker which supports RawHistoryQuery + // GoWorkerRawHistoryQueryVersion indicates the minimum client version of the go worker which supports RawHistoryQuery GoWorkerRawHistoryQueryVersion = "1.6.0" // CLIRawHistoryQueryVersion indicates the minimum CLI version of the go worker which supports RawHistoryQuery // Note: cli uses go client feature version diff --git a/common/constants.go b/common/constants.go index 8184fa4b83a..bc240e52568 100644 --- a/common/constants.go +++ b/common/constants.go @@ -45,6 +45,8 @@ const ( LastBlobNextPageToken = -1 // EndMessageID is the id of the end message, here we use the int64 max EndMessageID int64 = 1<<63 - 1 + // EmptyMessageID is the default start message ID for replication level + EmptyMessageID = -1 ) const ( diff --git a/service/history/handler.go b/service/history/handler.go index 32a0dfcb78b..1b7211bc817 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -49,6 +49,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/events" + "github.com/uber/cadence/service/history/replication" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" ) @@ -66,7 +67,7 @@ type ( historyEventNotifier events.Notifier publisher messaging.Producer rateLimiter quotas.Limiter - replicationTaskFetchers ReplicationTaskFetchers + replicationTaskFetchers replication.TaskFetchers queueTaskProcessor task.Processor } ) @@ -125,7 +126,7 @@ func (h *Handler) Start() { } } - h.replicationTaskFetchers = NewReplicationTaskFetchers( + h.replicationTaskFetchers = replication.NewTaskFetchers( h.GetLogger(), h.config, h.GetClusterMetadata().GetReplicationConsumerConfig(), diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index f04eafae852..905cec6c617 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -58,6 +58,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/ndc" "github.com/uber/cadence/service/history/query" + "github.com/uber/cadence/service/history/replication" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" @@ -70,6 +71,7 @@ const ( activityCancellationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED" defaultQueryFirstDecisionTaskWaitTime = time.Second queryFirstDecisionTaskCheckInterval = 200 * time.Millisecond + replicationTimeout = 30 * time.Second ) type ( @@ -99,13 +101,13 @@ type ( resetor reset.WorkflowResetor workflowResetter reset.WorkflowResetter queueTaskProcessor task.Processor - replicationTaskProcessors []ReplicationTaskProcessor + replicationTaskProcessors []replication.TaskProcessor publicClient workflowserviceclient.Interface eventsReapplier nDCEventsReapplier matchingClient matching.Client rawMatchingClient matching.Client clientChecker client.VersionChecker - replicationDLQHandler replicationDLQHandler + replicationDLQHandler replication.DLQHandler } ) @@ -161,7 +163,7 @@ func NewEngineWithShardContext( historyEventNotifier events.Notifier, publisher messaging.Producer, config *config.Config, - replicationTaskFetchers ReplicationTaskFetchers, + replicationTaskFetchers replication.TaskFetchers, rawMatchingClient matching.Client, queueTaskProcessor task.Processor, ) engine.Engine { @@ -265,7 +267,7 @@ func NewEngineWithShardContext( nil, shard.GetLogger(), ) - replicationTaskExecutor := newReplicationTaskExecutor( + replicationTaskExecutor := replication.NewTaskExecutor( currentClusterName, shard.GetDomainCache(), nDCHistoryResender, @@ -274,9 +276,9 @@ func NewEngineWithShardContext( shard.GetMetricsClient(), shard.GetLogger(), ) - var replicationTaskProcessors []ReplicationTaskProcessor + var replicationTaskProcessors []replication.TaskProcessor for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() { - replicationTaskProcessor := NewReplicationTaskProcessor( + replicationTaskProcessor := replication.NewTaskProcessor( shard, historyEngImpl, config, @@ -287,7 +289,7 @@ func NewEngineWithShardContext( replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor) } historyEngImpl.replicationTaskProcessors = replicationTaskProcessors - replicationMessageHandler := newReplicationDLQHandler(shard, replicationTaskExecutor) + replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutor) historyEngImpl.replicationDLQHandler = replicationMessageHandler shard.SetEngine(historyEngImpl) @@ -2944,7 +2946,7 @@ func (e *historyEngineImpl) ReadDLQMessages( request *r.ReadDLQMessagesRequest, ) (*r.ReadDLQMessagesResponse, error) { - tasks, token, err := e.replicationDLQHandler.readMessages( + tasks, token, err := e.replicationDLQHandler.ReadMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), @@ -2966,7 +2968,7 @@ func (e *historyEngineImpl) PurgeDLQMessages( request *r.PurgeDLQMessagesRequest, ) error { - return e.replicationDLQHandler.purgeMessages( + return e.replicationDLQHandler.PurgeMessages( request.GetSourceCluster(), request.GetInclusiveEndMessageID(), ) @@ -2977,7 +2979,7 @@ func (e *historyEngineImpl) MergeDLQMessages( request *r.MergeDLQMessagesRequest, ) (*r.MergeDLQMessagesResponse, error) { - token, err := e.replicationDLQHandler.mergeMessages( + token, err := e.replicationDLQHandler.MergeMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), diff --git a/service/history/replicationDLQHandler.go b/service/history/replication/dlq_handler.go similarity index 83% rename from service/history/replicationDLQHandler.go rename to service/history/replication/dlq_handler.go index 7095789ffbb..56d62fd64a5 100644 --- a/service/history/replicationDLQHandler.go +++ b/service/history/replication/dlq_handler.go @@ -18,9 +18,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replicationDLQHandler_mock.go -self_package github.com/uber/cadence/service/history +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination dlq_handler_mock.go -package history +package replication import ( "context" @@ -34,20 +34,20 @@ import ( ) type ( - // replicationDLQHandler is the interface handles replication DLQ messages - replicationDLQHandler interface { - readMessages( + // DLQHandler is the interface handles replication DLQ messages + DLQHandler interface { + ReadMessages( ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte, ) ([]*replicator.ReplicationTask, []byte, error) - purgeMessages( + PurgeMessages( sourceCluster string, lastMessageID int64, ) error - mergeMessages( + MergeMessages( ctx context.Context, sourceCluster string, lastMessageID int64, @@ -56,26 +56,29 @@ type ( ) ([]byte, error) } - replicationDLQHandlerImpl struct { - replicationTaskExecutor replicationTaskExecutor - shard shard.Context - logger log.Logger + dlqHandlerImpl struct { + taskExecutor TaskExecutor + shard shard.Context + logger log.Logger } ) -func newReplicationDLQHandler( +var _ DLQHandler = (*dlqHandlerImpl)(nil) + +// NewDLQHandler initialize the replication message DLQ handler +func NewDLQHandler( shard shard.Context, - replicationTaskExecutor replicationTaskExecutor, -) replicationDLQHandler { + taskExecutor TaskExecutor, +) DLQHandler { - return &replicationDLQHandlerImpl{ - shard: shard, - replicationTaskExecutor: replicationTaskExecutor, - logger: shard.GetLogger(), + return &dlqHandlerImpl{ + shard: shard, + taskExecutor: taskExecutor, + logger: shard.GetLogger(), } } -func (r *replicationDLQHandlerImpl) readMessages( +func (r *dlqHandlerImpl) ReadMessages( ctx context.Context, sourceCluster string, lastMessageID int64, @@ -93,7 +96,7 @@ func (r *replicationDLQHandlerImpl) readMessages( return tasks, token, err } -func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel( +func (r *dlqHandlerImpl) readMessagesWithAckLevel( ctx context.Context, sourceCluster string, lastMessageID int64, @@ -142,7 +145,7 @@ func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel( return dlqResponse.ReplicationTasks, ackLevel, resp.NextPageToken, nil } -func (r *replicationDLQHandlerImpl) purgeMessages( +func (r *dlqHandlerImpl) PurgeMessages( sourceCluster string, lastMessageID int64, ) error { @@ -169,7 +172,7 @@ func (r *replicationDLQHandlerImpl) purgeMessages( return nil } -func (r *replicationDLQHandlerImpl) mergeMessages( +func (r *dlqHandlerImpl) MergeMessages( ctx context.Context, sourceCluster string, lastMessageID int64, @@ -186,7 +189,7 @@ func (r *replicationDLQHandlerImpl) mergeMessages( ) for _, task := range tasks { - if _, err := r.replicationTaskExecutor.execute( + if _, err := r.taskExecutor.execute( sourceCluster, task, true, diff --git a/service/history/replication/dlq_handler_mock.go b/service/history/replication/dlq_handler_mock.go new file mode 100644 index 00000000000..3aa894d0159 --- /dev/null +++ b/service/history/replication/dlq_handler_mock.go @@ -0,0 +1,105 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: dlq_handler.go + +// Package replication is a generated GoMock package. +package replication + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + replicator "github.com/uber/cadence/.gen/go/replicator" +) + +// MockDLQHandler is a mock of DLQHandler interface +type MockDLQHandler struct { + ctrl *gomock.Controller + recorder *MockDLQHandlerMockRecorder +} + +// MockDLQHandlerMockRecorder is the mock recorder for MockDLQHandler +type MockDLQHandlerMockRecorder struct { + mock *MockDLQHandler +} + +// NewMockDLQHandler creates a new mock instance +func NewMockDLQHandler(ctrl *gomock.Controller) *MockDLQHandler { + mock := &MockDLQHandler{ctrl: ctrl} + mock.recorder = &MockDLQHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDLQHandler) EXPECT() *MockDLQHandlerMockRecorder { + return m.recorder +} + +// ReadMessages mocks base method +func (m *MockDLQHandler) ReadMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]*replicator.ReplicationTask, []byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadMessages", ctx, sourceCluster, lastMessageID, pageSize, pageToken) + ret0, _ := ret[0].([]*replicator.ReplicationTask) + ret1, _ := ret[1].([]byte) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ReadMessages indicates an expected call of ReadMessages +func (mr *MockDLQHandlerMockRecorder) ReadMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadMessages", reflect.TypeOf((*MockDLQHandler)(nil).ReadMessages), ctx, sourceCluster, lastMessageID, pageSize, pageToken) +} + +// PurgeMessages mocks base method +func (m *MockDLQHandler) PurgeMessages(sourceCluster string, lastMessageID int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PurgeMessages", sourceCluster, lastMessageID) + ret0, _ := ret[0].(error) + return ret0 +} + +// PurgeMessages indicates an expected call of PurgeMessages +func (mr *MockDLQHandlerMockRecorder) PurgeMessages(sourceCluster, lastMessageID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PurgeMessages", reflect.TypeOf((*MockDLQHandler)(nil).PurgeMessages), sourceCluster, lastMessageID) +} + +// MergeMessages mocks base method +func (m *MockDLQHandler) MergeMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MergeMessages", ctx, sourceCluster, lastMessageID, pageSize, pageToken) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MergeMessages indicates an expected call of MergeMessages +func (mr *MockDLQHandlerMockRecorder) MergeMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MergeMessages", reflect.TypeOf((*MockDLQHandler)(nil).MergeMessages), ctx, sourceCluster, lastMessageID, pageSize, pageToken) +} diff --git a/service/history/replicationDLQHandler_test.go b/service/history/replication/dlq_handler_test.go similarity index 77% rename from service/history/replicationDLQHandler_test.go rename to service/history/replication/dlq_handler_test.go index 748fb0de57f..4406a2b75b4 100644 --- a/service/history/replicationDLQHandler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package replication import ( "context" @@ -42,38 +42,38 @@ import ( ) type ( - replicationDLQHandlerSuite struct { + dlqHandlerSuite struct { suite.Suite *require.Assertions controller *gomock.Controller - mockShard *shard.TestContext - config *config.Config - mockClientBean *client.MockBean - adminClient *adminservicetest.MockClient - clusterMetadata *cluster.MockMetadata - executionManager *mocks.ExecutionManager - shardManager *mocks.ShardManager - replicatorTaskExecutor *MockreplicationTaskExecutor + mockShard *shard.TestContext + config *config.Config + mockClientBean *client.MockBean + adminClient *adminservicetest.MockClient + clusterMetadata *cluster.MockMetadata + executionManager *mocks.ExecutionManager + shardManager *mocks.ShardManager + taskExecutor *MockTaskExecutor - replicationMessageHandler *replicationDLQHandlerImpl + messageHandler *dlqHandlerImpl } ) -func TestReplicationMessageHandlerSuite(t *testing.T) { - s := new(replicationDLQHandlerSuite) +func TestDLQMessageHandlerSuite(t *testing.T) { + s := new(dlqHandlerSuite) suite.Run(t, s) } -func (s *replicationDLQHandlerSuite) SetupSuite() { +func (s *dlqHandlerSuite) SetupSuite() { } -func (s *replicationDLQHandlerSuite) TearDownSuite() { +func (s *dlqHandlerSuite) TearDownSuite() { } -func (s *replicationDLQHandlerSuite) SetupTest() { +func (s *dlqHandlerSuite) SetupTest() { s.Assertions = require.New(s.T()) s.config = config.NewForTest() @@ -96,20 +96,20 @@ func (s *replicationDLQHandlerSuite) SetupTest() { s.shardManager = s.mockShard.Resource.ShardMgr s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("active").AnyTimes() - s.replicatorTaskExecutor = NewMockreplicationTaskExecutor(s.controller) + s.taskExecutor = NewMockTaskExecutor(s.controller) - s.replicationMessageHandler = newReplicationDLQHandler( + s.messageHandler = NewDLQHandler( s.mockShard, - s.replicatorTaskExecutor, - ).(*replicationDLQHandlerImpl) + s.taskExecutor, + ).(*dlqHandlerImpl) } -func (s *replicationDLQHandlerSuite) TearDownTest() { +func (s *dlqHandlerSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) } -func (s *replicationDLQHandlerSuite) TestReadMessages_OK() { +func (s *dlqHandlerSuite) TestReadMessages_OK() { ctx := context.Background() sourceCluster := "test" lastMessageID := int64(1) @@ -141,13 +141,13 @@ func (s *replicationDLQHandlerSuite) TestReadMessages_OK() { s.adminClient.EXPECT(). GetDLQReplicationMessages(ctx, gomock.Any()). Return(&replicator.GetDLQReplicationMessagesResponse{}, nil) - tasks, token, err := s.replicationMessageHandler.readMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken) + tasks, token, err := s.messageHandler.ReadMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken) s.NoError(err) s.Nil(token) s.Nil(tasks) } -func (s *replicationDLQHandlerSuite) TestPurgeMessages_OK() { +func (s *dlqHandlerSuite) TestPurgeMessages_OK() { sourceCluster := "test" lastMessageID := int64(1) @@ -159,11 +159,11 @@ func (s *replicationDLQHandlerSuite) TestPurgeMessages_OK() { }).Return(nil).Times(1) s.shardManager.On("UpdateShard", mock.Anything).Return(nil) - err := s.replicationMessageHandler.purgeMessages(sourceCluster, lastMessageID) + err := s.messageHandler.PurgeMessages(sourceCluster, lastMessageID) s.NoError(err) } -func (s *replicationDLQHandlerSuite) TestMergeMessages_OK() { +func (s *dlqHandlerSuite) TestMergeMessages_OK() { ctx := context.Background() sourceCluster := "test" lastMessageID := int64(1) @@ -204,7 +204,7 @@ func (s *replicationDLQHandlerSuite) TestMergeMessages_OK() { replicationTask, }, }, nil) - s.replicatorTaskExecutor.EXPECT().execute(sourceCluster, replicationTask, true).Return(0, nil).Times(1) + s.taskExecutor.EXPECT().execute(sourceCluster, replicationTask, true).Return(0, nil).Times(1) s.executionManager.On("RangeDeleteReplicationTaskFromDLQ", &persistence.RangeDeleteReplicationTaskFromDLQRequest{ SourceClusterName: sourceCluster, @@ -214,7 +214,7 @@ func (s *replicationDLQHandlerSuite) TestMergeMessages_OK() { s.shardManager.On("UpdateShard", mock.Anything).Return(nil) - token, err := s.replicationMessageHandler.mergeMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken) + token, err := s.messageHandler.MergeMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken) s.NoError(err) s.Nil(token) } diff --git a/service/history/replicationTaskExecutor.go b/service/history/replication/task_executor.go similarity index 92% rename from service/history/replicationTaskExecutor.go rename to service/history/replication/task_executor.go index 8ba50c7ffcc..0bfd805d5d6 100644 --- a/service/history/replicationTaskExecutor.go +++ b/service/history/replication/task_executor.go @@ -18,9 +18,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replicationTaskExecutor_mock.go -self_package github.com/uber/cadence/service/history +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination task_executor_mock.go -package history +package replication import ( "context" @@ -38,11 +38,12 @@ import ( ) type ( - replicationTaskExecutor interface { + // TaskExecutor is the executor for replication task + TaskExecutor interface { execute(sourceCluster string, replicationTask *r.ReplicationTask, forceApply bool) (int, error) } - replicationTaskExecutorImpl struct { + taskExecutorImpl struct { currentCluster string domainCache cache.DomainCache nDCHistoryResender xdc.NDCHistoryResender @@ -54,9 +55,11 @@ type ( } ) -// newReplicationTaskExecutor creates an replication task executor +var _ TaskExecutor = (*taskExecutorImpl)(nil) + +// NewTaskExecutor creates an replication task executor // The executor uses by 1) DLQ replication task handler 2) history replication task processor -func newReplicationTaskExecutor( +func NewTaskExecutor( currentCluster string, domainCache cache.DomainCache, nDCHistoryResender xdc.NDCHistoryResender, @@ -64,8 +67,8 @@ func newReplicationTaskExecutor( historyEngine engine.Engine, metricsClient metrics.Client, logger log.Logger, -) replicationTaskExecutor { - return &replicationTaskExecutorImpl{ +) TaskExecutor { + return &taskExecutorImpl{ currentCluster: currentCluster, domainCache: domainCache, nDCHistoryResender: nDCHistoryResender, @@ -76,7 +79,7 @@ func newReplicationTaskExecutor( } } -func (e *replicationTaskExecutorImpl) execute( +func (e *taskExecutorImpl) execute( sourceCluster string, replicationTask *r.ReplicationTask, forceApply bool, @@ -109,7 +112,7 @@ func (e *replicationTaskExecutorImpl) execute( return scope, err } -func (e *replicationTaskExecutorImpl) handleActivityTask( +func (e *taskExecutorImpl) handleActivityTask( task *r.ReplicationTask, forceApply bool, ) error { @@ -191,7 +194,7 @@ func (e *replicationTaskExecutorImpl) handleActivityTask( } //TODO: remove this part after 2DC deprecation -func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask( +func (e *taskExecutorImpl) handleHistoryReplicationTask( sourceCluster string, task *r.ReplicationTask, forceApply bool, @@ -250,7 +253,7 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTask( return e.historyEngine.ReplicateEvents(ctx, request) } -func (e *replicationTaskExecutorImpl) handleHistoryReplicationTaskV2( +func (e *taskExecutorImpl) handleHistoryReplicationTaskV2( task *r.ReplicationTask, forceApply bool, ) error { @@ -301,7 +304,7 @@ func (e *replicationTaskExecutorImpl) handleHistoryReplicationTaskV2( return e.historyEngine.ReplicateEventsV2(ctx, request) } -func (e *replicationTaskExecutorImpl) filterTask( +func (e *taskExecutorImpl) filterTask( domainID string, forceApply bool, ) (bool, error) { @@ -327,7 +330,7 @@ FilterLoop: } //TODO: remove this code after 2DC deprecation -func (e *replicationTaskExecutorImpl) convertRetryTaskError( +func (e *taskExecutorImpl) convertRetryTaskError( err error, ) (*shared.RetryTaskError, bool) { @@ -335,7 +338,7 @@ func (e *replicationTaskExecutorImpl) convertRetryTaskError( return retError, ok } -func (e *replicationTaskExecutorImpl) convertRetryTaskV2Error( +func (e *taskExecutorImpl) convertRetryTaskV2Error( err error, ) (*shared.RetryTaskV2Error, bool) { diff --git a/service/history/replicationTaskExecutor_mock.go b/service/history/replication/task_executor_mock.go similarity index 60% rename from service/history/replicationTaskExecutor_mock.go rename to service/history/replication/task_executor_mock.go index 44633e0e3bb..0d6a4feaeb4 100644 --- a/service/history/replicationTaskExecutor_mock.go +++ b/service/history/replication/task_executor_mock.go @@ -22,10 +22,10 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: replicationTaskExecutor.go +// Source: task_executor.go -// Package history is a generated GoMock package. -package history +// Package replication is a generated GoMock package. +package replication import ( reflect "reflect" @@ -35,31 +35,31 @@ import ( replicator "github.com/uber/cadence/.gen/go/replicator" ) -// MockreplicationTaskExecutor is a mock of replicationTaskExecutor interface -type MockreplicationTaskExecutor struct { +// MockTaskExecutor is a mock of TaskExecutor interface +type MockTaskExecutor struct { ctrl *gomock.Controller - recorder *MockreplicationTaskExecutorMockRecorder + recorder *MockTaskExecutorMockRecorder } -// MockreplicationTaskExecutorMockRecorder is the mock recorder for MockreplicationTaskExecutor -type MockreplicationTaskExecutorMockRecorder struct { - mock *MockreplicationTaskExecutor +// MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor +type MockTaskExecutorMockRecorder struct { + mock *MockTaskExecutor } -// NewMockreplicationTaskExecutor creates a new mock instance -func NewMockreplicationTaskExecutor(ctrl *gomock.Controller) *MockreplicationTaskExecutor { - mock := &MockreplicationTaskExecutor{ctrl: ctrl} - mock.recorder = &MockreplicationTaskExecutorMockRecorder{mock} +// NewMockTaskExecutor creates a new mock instance +func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor { + mock := &MockTaskExecutor{ctrl: ctrl} + mock.recorder = &MockTaskExecutorMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockreplicationTaskExecutor) EXPECT() *MockreplicationTaskExecutorMockRecorder { +func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { return m.recorder } // execute mocks base method -func (m *MockreplicationTaskExecutor) execute(sourceCluster string, replicationTask *replicator.ReplicationTask, forceApply bool) (int, error) { +func (m *MockTaskExecutor) execute(sourceCluster string, replicationTask *replicator.ReplicationTask, forceApply bool) (int, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "execute", sourceCluster, replicationTask, forceApply) ret0, _ := ret[0].(int) @@ -68,7 +68,7 @@ func (m *MockreplicationTaskExecutor) execute(sourceCluster string, replicationT } // execute indicates an expected call of execute -func (mr *MockreplicationTaskExecutorMockRecorder) execute(sourceCluster, replicationTask, forceApply interface{}) *gomock.Call { +func (mr *MockTaskExecutorMockRecorder) execute(sourceCluster, replicationTask, forceApply interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "execute", reflect.TypeOf((*MockreplicationTaskExecutor)(nil).execute), sourceCluster, replicationTask, forceApply) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "execute", reflect.TypeOf((*MockTaskExecutor)(nil).execute), sourceCluster, replicationTask, forceApply) } diff --git a/service/history/replicationTaskExecutor_test.go b/service/history/replication/task_executor_test.go similarity index 71% rename from service/history/replicationTaskExecutor_test.go rename to service/history/replication/task_executor_test.go index 640437b6c11..36f7287b526 100644 --- a/service/history/replicationTaskExecutor_test.go +++ b/service/history/replication/task_executor_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package replication import ( "fmt" @@ -49,43 +49,42 @@ import ( ) type ( - replicationTaskExecutorSuite struct { + taskExecutorSuite struct { suite.Suite *require.Assertions controller *gomock.Controller - currentCluster string - mockShard *shard.TestContext - mockEngine *engine.MockEngine - config *config.Config - historyClient *historyservicetest.MockClient - replicationTaskFetcher *MockReplicationTaskFetcher - mockDomainCache *cache.MockDomainCache - mockClientBean *client.MockBean - adminClient *adminservicetest.MockClient - clusterMetadata *cluster.MockMetadata - executionManager *mocks.ExecutionManager - nDCHistoryResender *xdc.MockNDCHistoryResender - historyRereplicator *xdc.MockHistoryRereplicator - - replicationTaskHandler *replicationTaskExecutorImpl + currentCluster string + mockShard *shard.TestContext + mockEngine *engine.MockEngine + config *config.Config + historyClient *historyservicetest.MockClient + mockDomainCache *cache.MockDomainCache + mockClientBean *client.MockBean + adminClient *adminservicetest.MockClient + clusterMetadata *cluster.MockMetadata + executionManager *mocks.ExecutionManager + nDCHistoryResender *xdc.MockNDCHistoryResender + historyRereplicator *xdc.MockHistoryRereplicator + + taskHandler *taskExecutorImpl } ) -func TestReplicationTaskExecutorSuite(t *testing.T) { - s := new(replicationTaskExecutorSuite) +func TestTaskExecutorSuite(t *testing.T) { + s := new(taskExecutorSuite) suite.Run(t, s) } -func (s *replicationTaskExecutorSuite) SetupSuite() { +func (s *taskExecutorSuite) SetupSuite() { } -func (s *replicationTaskExecutorSuite) TearDownSuite() { +func (s *taskExecutorSuite) TearDownSuite() { } -func (s *replicationTaskExecutorSuite) SetupTest() { +func (s *taskExecutorSuite) SetupTest() { s.Assertions = require.New(s.T()) s.currentCluster = "test" @@ -117,7 +116,7 @@ func (s *replicationTaskExecutorSuite) SetupTest() { metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("active").AnyTimes() - s.replicationTaskHandler = newReplicationTaskExecutor( + s.taskHandler = NewTaskExecutor( s.currentCluster, s.mockDomainCache, s.nDCHistoryResender, @@ -125,39 +124,39 @@ func (s *replicationTaskExecutorSuite) SetupTest() { s.mockEngine, metricsClient, s.mockShard.GetLogger(), - ).(*replicationTaskExecutorImpl) + ).(*taskExecutorImpl) } -func (s *replicationTaskExecutorSuite) TearDownTest() { +func (s *taskExecutorSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) } -func (s *replicationTaskExecutorSuite) TestConvertRetryTaskError_OK() { +func (s *taskExecutorSuite) TestConvertRetryTaskError_OK() { err := &shared.RetryTaskError{} - _, ok := s.replicationTaskHandler.convertRetryTaskError(err) + _, ok := s.taskHandler.convertRetryTaskError(err) s.True(ok) } -func (s *replicationTaskExecutorSuite) TestConvertRetryTaskError_NotOK() { +func (s *taskExecutorSuite) TestConvertRetryTaskError_NotOK() { err := &shared.RetryTaskV2Error{} - _, ok := s.replicationTaskHandler.convertRetryTaskError(err) + _, ok := s.taskHandler.convertRetryTaskError(err) s.False(ok) } -func (s *replicationTaskExecutorSuite) TestConvertRetryTaskV2Error_OK() { +func (s *taskExecutorSuite) TestConvertRetryTaskV2Error_OK() { err := &shared.RetryTaskV2Error{} - _, ok := s.replicationTaskHandler.convertRetryTaskV2Error(err) + _, ok := s.taskHandler.convertRetryTaskV2Error(err) s.True(ok) } -func (s *replicationTaskExecutorSuite) TestConvertRetryTaskV2Error_NotOK() { +func (s *taskExecutorSuite) TestConvertRetryTaskV2Error_NotOK() { err := &shared.RetryTaskError{} - _, ok := s.replicationTaskHandler.convertRetryTaskV2Error(err) + _, ok := s.taskHandler.convertRetryTaskV2Error(err) s.False(ok) } -func (s *replicationTaskExecutorSuite) TestFilterTask() { +func (s *taskExecutorSuite) TestFilterTask() { domainID := uuid.New() s.mockDomainCache.EXPECT(). GetDomainByID(domainID). @@ -173,29 +172,29 @@ func (s *replicationTaskExecutorSuite) TestFilterTask() { 0, s.clusterMetadata, ), nil) - ok, err := s.replicationTaskHandler.filterTask(domainID, false) + ok, err := s.taskHandler.filterTask(domainID, false) s.NoError(err) s.True(ok) } -func (s *replicationTaskExecutorSuite) TestFilterTask_Error() { +func (s *taskExecutorSuite) TestFilterTask_Error() { domainID := uuid.New() s.mockDomainCache.EXPECT(). GetDomainByID(domainID). Return(nil, fmt.Errorf("test")) - ok, err := s.replicationTaskHandler.filterTask(domainID, false) + ok, err := s.taskHandler.filterTask(domainID, false) s.Error(err) s.False(ok) } -func (s *replicationTaskExecutorSuite) TestFilterTask_EnforceApply() { +func (s *taskExecutorSuite) TestFilterTask_EnforceApply() { domainID := uuid.New() - ok, err := s.replicationTaskHandler.filterTask(domainID, true) + ok, err := s.taskHandler.filterTask(domainID, true) s.NoError(err) s.True(ok) } -func (s *replicationTaskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { +func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -214,11 +213,11 @@ func (s *replicationTaskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicati } s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil).Times(1) - _, err := s.replicationTaskHandler.execute(s.currentCluster, task, true) + _, err := s.taskHandler.execute(s.currentCluster, task, true) s.NoError(err) } -func (s *replicationTaskExecutorSuite) TestProcessTaskOnce_HistoryReplicationTask() { +func (s *taskExecutorSuite) TestProcessTaskOnce_HistoryReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -241,11 +240,11 @@ func (s *replicationTaskExecutorSuite) TestProcessTaskOnce_HistoryReplicationTas } s.mockEngine.EXPECT().ReplicateEvents(gomock.Any(), request).Return(nil).Times(1) - _, err := s.replicationTaskHandler.execute(s.currentCluster, task, true) + _, err := s.taskHandler.execute(s.currentCluster, task, true) s.NoError(err) } -func (s *replicationTaskExecutorSuite) TestProcess_HistoryV2ReplicationTask() { +func (s *taskExecutorSuite) TestProcess_HistoryV2ReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -266,6 +265,6 @@ func (s *replicationTaskExecutorSuite) TestProcess_HistoryV2ReplicationTask() { } s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil).Times(1) - _, err := s.replicationTaskHandler.execute(s.currentCluster, task, true) + _, err := s.taskHandler.execute(s.currentCluster, task, true) s.NoError(err) } diff --git a/service/history/replicationTaskFetcher.go b/service/history/replication/task_fetcher.go similarity index 81% rename from service/history/replicationTaskFetcher.go rename to service/history/replication/task_fetcher.go index ad2b30f1760..ae2f11c7505 100644 --- a/service/history/replicationTaskFetcher.go +++ b/service/history/replication/task_fetcher.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,9 +18,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replicationTaskFetcher_mock.go -self_package github.com/uber/cadence/service/history +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination task_fetcher_mock.go -self_package github.com/uber/cadence/service/history/replication -package history +package replication import ( "context" @@ -45,50 +45,54 @@ const ( ) type ( - // ReplicationTaskFetcherImpl is the implementation of fetching replication messages. - ReplicationTaskFetcherImpl struct { - status int32 - currentCluster string - sourceCluster string - config *config.Config - logger log.Logger - remotePeer admin.Client - requestChan chan *request - done chan struct{} - } - // ReplicationTaskFetcher is responsible for fetching replication messages from remote DC. - ReplicationTaskFetcher interface { + // TaskFetcher is responsible for fetching replication messages from remote DC. + TaskFetcher interface { common.Daemon GetSourceCluster() string GetRequestChan() chan<- *request } - // ReplicationTaskFetchers is a group of fetchers, one per source DC. - ReplicationTaskFetchers interface { + // TaskFetchers is a group of fetchers, one per source DC. + TaskFetchers interface { common.Daemon - GetFetchers() []ReplicationTaskFetcher + GetFetchers() []TaskFetcher + } + + // taskFetcherImpl is the implementation of fetching replication messages. + taskFetcherImpl struct { + status int32 + currentCluster string + sourceCluster string + config *config.Config + logger log.Logger + remotePeer admin.Client + requestChan chan *request + done chan struct{} } - // ReplicationTaskFetchersImpl is a group of fetchers, one per source DC. - ReplicationTaskFetchersImpl struct { + // taskFetchersImpl is a group of fetchers, one per source DC. + taskFetchersImpl struct { status int32 logger log.Logger - fetchers []ReplicationTaskFetcher + fetchers []TaskFetcher } ) -// NewReplicationTaskFetchers creates an instance of ReplicationTaskFetchers with given configs. -func NewReplicationTaskFetchers( +var _ TaskFetcher = (*taskFetcherImpl)(nil) +var _ TaskFetchers = (*taskFetchersImpl)(nil) + +// NewTaskFetchers creates an instance of ReplicationTaskFetchers with given configs. +func NewTaskFetchers( logger log.Logger, config *config.Config, consumerConfig *serviceConfig.ReplicationConsumerConfig, clusterMetadata cluster.Metadata, clientBean client.Bean, -) *ReplicationTaskFetchersImpl { +) TaskFetchers { - var fetchers []ReplicationTaskFetcher + var fetchers []TaskFetcher if consumerConfig.Type == serviceConfig.ReplicationConsumerTypeRPC { for clusterName, info := range clusterMetadata.GetAllClusterInfo() { if !info.Enabled { @@ -110,7 +114,7 @@ func NewReplicationTaskFetchers( } } - return &ReplicationTaskFetchersImpl{ + return &taskFetchersImpl{ fetchers: fetchers, status: common.DaemonStatusInitialized, logger: logger, @@ -118,7 +122,7 @@ func NewReplicationTaskFetchers( } // Start starts the fetchers -func (f *ReplicationTaskFetchersImpl) Start() { +func (f *taskFetchersImpl) Start() { if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } @@ -130,7 +134,7 @@ func (f *ReplicationTaskFetchersImpl) Start() { } // Stop stops the fetchers -func (f *ReplicationTaskFetchersImpl) Stop() { +func (f *taskFetchersImpl) Stop() { if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } @@ -142,7 +146,7 @@ func (f *ReplicationTaskFetchersImpl) Stop() { } // GetFetchers returns all the fetchers -func (f *ReplicationTaskFetchersImpl) GetFetchers() []ReplicationTaskFetcher { +func (f *taskFetchersImpl) GetFetchers() []TaskFetcher { return f.fetchers } @@ -153,9 +157,9 @@ func newReplicationTaskFetcher( currentCluster string, config *config.Config, sourceFrontend admin.Client, -) *ReplicationTaskFetcherImpl { +) TaskFetcher { - return &ReplicationTaskFetcherImpl{ + return &taskFetcherImpl{ status: common.DaemonStatusInitialized, config: config, logger: logger.WithTags(tag.ClusterName(sourceCluster)), @@ -168,7 +172,7 @@ func newReplicationTaskFetcher( } // Start starts the fetcher -func (f *ReplicationTaskFetcherImpl) Start() { +func (f *taskFetcherImpl) Start() { if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } @@ -180,7 +184,7 @@ func (f *ReplicationTaskFetcherImpl) Start() { } // Stop stops the fetcher -func (f *ReplicationTaskFetcherImpl) Stop() { +func (f *taskFetcherImpl) Stop() { if !atomic.CompareAndSwapInt32(&f.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } @@ -190,7 +194,7 @@ func (f *ReplicationTaskFetcherImpl) Stop() { } // fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend. -func (f *ReplicationTaskFetcherImpl) fetchTasks() { +func (f *taskFetcherImpl) fetchTasks() { timer := time.NewTimer(backoff.JitDuration( f.config.ReplicationTaskFetcherAggregationInterval(), f.config.ReplicationTaskFetcherTimerJitterCoefficient(), @@ -233,7 +237,7 @@ func (f *ReplicationTaskFetcherImpl) fetchTasks() { } } -func (f *ReplicationTaskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*request) error { +func (f *taskFetcherImpl) fetchAndDistributeTasks(requestByShard map[int32]*request) error { if len(requestByShard) == 0 { // We don't receive tasks from previous fetch so processors are all sleeping. f.logger.Debug("Skip fetching as no processor is asking for tasks.") @@ -258,7 +262,7 @@ func (f *ReplicationTaskFetcherImpl) fetchAndDistributeTasks(requestByShard map[ return nil } -func (f *ReplicationTaskFetcherImpl) getMessages( +func (f *taskFetcherImpl) getMessages( requestByShard map[int32]*request, ) (map[int32]*r.ReplicationMessages, error) { var tokens []*r.ReplicationToken @@ -282,11 +286,11 @@ func (f *ReplicationTaskFetcherImpl) getMessages( } // GetSourceCluster returns the source cluster for the fetcher -func (f *ReplicationTaskFetcherImpl) GetSourceCluster() string { +func (f *taskFetcherImpl) GetSourceCluster() string { return f.sourceCluster } // GetRequestChan returns the request chan for the fetcher -func (f *ReplicationTaskFetcherImpl) GetRequestChan() chan<- *request { +func (f *taskFetcherImpl) GetRequestChan() chan<- *request { return f.requestChan } diff --git a/service/history/replicationTaskFetcher_mock.go b/service/history/replication/task_fetcher_mock.go similarity index 52% rename from service/history/replicationTaskFetcher_mock.go rename to service/history/replication/task_fetcher_mock.go index 41680f74e11..d50e87ee89e 100644 --- a/service/history/replicationTaskFetcher_mock.go +++ b/service/history/replication/task_fetcher_mock.go @@ -22,10 +22,10 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: replicationTaskFetcher.go +// Source: task_fetcher.go -// Package history is a generated GoMock package. -package history +// Package replication is a generated GoMock package. +package replication import ( reflect "reflect" @@ -33,55 +33,55 @@ import ( gomock "github.com/golang/mock/gomock" ) -// MockReplicationTaskFetcher is a mock of ReplicationTaskFetcher interface -type MockReplicationTaskFetcher struct { +// MockTaskFetcher is a mock of TaskFetcher interface +type MockTaskFetcher struct { ctrl *gomock.Controller - recorder *MockReplicationTaskFetcherMockRecorder + recorder *MockTaskFetcherMockRecorder } -// MockReplicationTaskFetcherMockRecorder is the mock recorder for MockReplicationTaskFetcher -type MockReplicationTaskFetcherMockRecorder struct { - mock *MockReplicationTaskFetcher +// MockTaskFetcherMockRecorder is the mock recorder for MockTaskFetcher +type MockTaskFetcherMockRecorder struct { + mock *MockTaskFetcher } -// NewMockReplicationTaskFetcher creates a new mock instance -func NewMockReplicationTaskFetcher(ctrl *gomock.Controller) *MockReplicationTaskFetcher { - mock := &MockReplicationTaskFetcher{ctrl: ctrl} - mock.recorder = &MockReplicationTaskFetcherMockRecorder{mock} +// NewMockTaskFetcher creates a new mock instance +func NewMockTaskFetcher(ctrl *gomock.Controller) *MockTaskFetcher { + mock := &MockTaskFetcher{ctrl: ctrl} + mock.recorder = &MockTaskFetcherMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockReplicationTaskFetcher) EXPECT() *MockReplicationTaskFetcherMockRecorder { +func (m *MockTaskFetcher) EXPECT() *MockTaskFetcherMockRecorder { return m.recorder } // Start mocks base method -func (m *MockReplicationTaskFetcher) Start() { +func (m *MockTaskFetcher) Start() { m.ctrl.T.Helper() m.ctrl.Call(m, "Start") } // Start indicates an expected call of Start -func (mr *MockReplicationTaskFetcherMockRecorder) Start() *gomock.Call { +func (mr *MockTaskFetcherMockRecorder) Start() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockReplicationTaskFetcher)(nil).Start)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTaskFetcher)(nil).Start)) } // Stop mocks base method -func (m *MockReplicationTaskFetcher) Stop() { +func (m *MockTaskFetcher) Stop() { m.ctrl.T.Helper() m.ctrl.Call(m, "Stop") } // Stop indicates an expected call of Stop -func (mr *MockReplicationTaskFetcherMockRecorder) Stop() *gomock.Call { +func (mr *MockTaskFetcherMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockReplicationTaskFetcher)(nil).Stop)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTaskFetcher)(nil).Stop)) } // GetSourceCluster mocks base method -func (m *MockReplicationTaskFetcher) GetSourceCluster() string { +func (m *MockTaskFetcher) GetSourceCluster() string { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSourceCluster") ret0, _ := ret[0].(string) @@ -89,13 +89,13 @@ func (m *MockReplicationTaskFetcher) GetSourceCluster() string { } // GetSourceCluster indicates an expected call of GetSourceCluster -func (mr *MockReplicationTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call { +func (mr *MockTaskFetcherMockRecorder) GetSourceCluster() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSourceCluster", reflect.TypeOf((*MockReplicationTaskFetcher)(nil).GetSourceCluster)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSourceCluster", reflect.TypeOf((*MockTaskFetcher)(nil).GetSourceCluster)) } // GetRequestChan mocks base method -func (m *MockReplicationTaskFetcher) GetRequestChan() chan<- *request { +func (m *MockTaskFetcher) GetRequestChan() chan<- *request { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetRequestChan") ret0, _ := ret[0].(chan<- *request) @@ -103,68 +103,68 @@ func (m *MockReplicationTaskFetcher) GetRequestChan() chan<- *request { } // GetRequestChan indicates an expected call of GetRequestChan -func (mr *MockReplicationTaskFetcherMockRecorder) GetRequestChan() *gomock.Call { +func (mr *MockTaskFetcherMockRecorder) GetRequestChan() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestChan", reflect.TypeOf((*MockReplicationTaskFetcher)(nil).GetRequestChan)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRequestChan", reflect.TypeOf((*MockTaskFetcher)(nil).GetRequestChan)) } -// MockReplicationTaskFetchers is a mock of ReplicationTaskFetchers interface -type MockReplicationTaskFetchers struct { +// MockTaskFetchers is a mock of TaskFetchers interface +type MockTaskFetchers struct { ctrl *gomock.Controller - recorder *MockReplicationTaskFetchersMockRecorder + recorder *MockTaskFetchersMockRecorder } -// MockReplicationTaskFetchersMockRecorder is the mock recorder for MockReplicationTaskFetchers -type MockReplicationTaskFetchersMockRecorder struct { - mock *MockReplicationTaskFetchers +// MockTaskFetchersMockRecorder is the mock recorder for MockTaskFetchers +type MockTaskFetchersMockRecorder struct { + mock *MockTaskFetchers } -// NewMockReplicationTaskFetchers creates a new mock instance -func NewMockReplicationTaskFetchers(ctrl *gomock.Controller) *MockReplicationTaskFetchers { - mock := &MockReplicationTaskFetchers{ctrl: ctrl} - mock.recorder = &MockReplicationTaskFetchersMockRecorder{mock} +// NewMockTaskFetchers creates a new mock instance +func NewMockTaskFetchers(ctrl *gomock.Controller) *MockTaskFetchers { + mock := &MockTaskFetchers{ctrl: ctrl} + mock.recorder = &MockTaskFetchersMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockReplicationTaskFetchers) EXPECT() *MockReplicationTaskFetchersMockRecorder { +func (m *MockTaskFetchers) EXPECT() *MockTaskFetchersMockRecorder { return m.recorder } // Start mocks base method -func (m *MockReplicationTaskFetchers) Start() { +func (m *MockTaskFetchers) Start() { m.ctrl.T.Helper() m.ctrl.Call(m, "Start") } // Start indicates an expected call of Start -func (mr *MockReplicationTaskFetchersMockRecorder) Start() *gomock.Call { +func (mr *MockTaskFetchersMockRecorder) Start() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockReplicationTaskFetchers)(nil).Start)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTaskFetchers)(nil).Start)) } // Stop mocks base method -func (m *MockReplicationTaskFetchers) Stop() { +func (m *MockTaskFetchers) Stop() { m.ctrl.T.Helper() m.ctrl.Call(m, "Stop") } // Stop indicates an expected call of Stop -func (mr *MockReplicationTaskFetchersMockRecorder) Stop() *gomock.Call { +func (mr *MockTaskFetchersMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockReplicationTaskFetchers)(nil).Stop)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTaskFetchers)(nil).Stop)) } // GetFetchers mocks base method -func (m *MockReplicationTaskFetchers) GetFetchers() []ReplicationTaskFetcher { +func (m *MockTaskFetchers) GetFetchers() []TaskFetcher { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetFetchers") - ret0, _ := ret[0].([]ReplicationTaskFetcher) + ret0, _ := ret[0].([]TaskFetcher) return ret0 } // GetFetchers indicates an expected call of GetFetchers -func (mr *MockReplicationTaskFetchersMockRecorder) GetFetchers() *gomock.Call { +func (mr *MockTaskFetchersMockRecorder) GetFetchers() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFetchers", reflect.TypeOf((*MockReplicationTaskFetchers)(nil).GetFetchers)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFetchers", reflect.TypeOf((*MockTaskFetchers)(nil).GetFetchers)) } diff --git a/service/history/replicationTaskFetcher_test.go b/service/history/replication/task_fetcher_test.go similarity index 81% rename from service/history/replicationTaskFetcher_test.go rename to service/history/replication/task_fetcher_test.go index 07ef537932f..9b659089bc4 100644 --- a/service/history/replicationTaskFetcher_test.go +++ b/service/history/replication/task_fetcher_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package replication import ( "testing" @@ -37,32 +37,32 @@ import ( ) type ( - replicationTaskFetcherSuite struct { + taskFetcherSuite struct { suite.Suite *require.Assertions controller *gomock.Controller - mockResource *resource.Test - config *config.Config - frontendClient *adminservicetest.MockClient - replicationTaskFetcher *ReplicationTaskFetcherImpl + mockResource *resource.Test + config *config.Config + frontendClient *adminservicetest.MockClient + taskFetcher *taskFetcherImpl } ) func TestReplicationTaskFetcherSuite(t *testing.T) { - s := new(replicationTaskFetcherSuite) + s := new(taskFetcherSuite) suite.Run(t, s) } -func (s *replicationTaskFetcherSuite) SetupSuite() { +func (s *taskFetcherSuite) SetupSuite() { } -func (s *replicationTaskFetcherSuite) TearDownSuite() { +func (s *taskFetcherSuite) TearDownSuite() { } -func (s *replicationTaskFetcherSuite) SetupTest() { +func (s *taskFetcherSuite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) @@ -71,21 +71,21 @@ func (s *replicationTaskFetcherSuite) SetupTest() { logger := log.NewNoop() s.config = config.NewForTest() - s.replicationTaskFetcher = newReplicationTaskFetcher( + s.taskFetcher = newReplicationTaskFetcher( logger, "standby", "active", s.config, s.frontendClient, - ) + ).(*taskFetcherImpl) } -func (s *replicationTaskFetcherSuite) TearDownTest() { +func (s *taskFetcherSuite) TearDownTest() { s.controller.Finish() s.mockResource.Finish(s.T()) } -func (s *replicationTaskFetcherSuite) TestGetMessages() { +func (s *taskFetcherSuite) TestGetMessages() { requestByShard := make(map[int32]*request) token := &replicator.ReplicationToken{ ShardID: common.Int32Ptr(0), @@ -107,12 +107,12 @@ func (s *replicationTaskFetcherSuite) TestGetMessages() { MessagesByShard: messageByShared, } s.frontendClient.EXPECT().GetReplicationMessages(gomock.Any(), replicationMessageRequest).Return(expectedResponse, nil) - response, err := s.replicationTaskFetcher.getMessages(requestByShard) + response, err := s.taskFetcher.getMessages(requestByShard) s.NoError(err) s.Equal(messageByShared, response) } -func (s *replicationTaskFetcherSuite) TestFetchAndDistributeTasks() { +func (s *taskFetcherSuite) TestFetchAndDistributeTasks() { requestByShard := make(map[int32]*request) token := &replicator.ReplicationToken{ ShardID: common.Int32Ptr(0), @@ -136,7 +136,7 @@ func (s *replicationTaskFetcherSuite) TestFetchAndDistributeTasks() { MessagesByShard: messageByShared, } s.frontendClient.EXPECT().GetReplicationMessages(gomock.Any(), replicationMessageRequest).Return(expectedResponse, nil) - err := s.replicationTaskFetcher.fetchAndDistributeTasks(requestByShard) + err := s.taskFetcher.fetchAndDistributeTasks(requestByShard) s.NoError(err) respToken := <-respChan s.Equal(messageByShared[0], respToken) diff --git a/service/history/replicationTaskProcessor.go b/service/history/replication/task_processor.go similarity index 82% rename from service/history/replicationTaskProcessor.go rename to service/history/replication/task_processor.go index 5551bd70326..74208048cf9 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replication/task_processor.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,9 +18,9 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination replicationTaskProcessor_mock.go -self_package github.com/uber/cadence/service/history +//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination task_processor_mock.go -self_package github.com/uber/cadence/service/history/replication -package history +package replication import ( "context" @@ -51,7 +51,6 @@ const ( replicationTimeout = 30 * time.Second taskErrorRetryBackoffCoefficient = 1.2 dlqErrorRetryWait = time.Second - emptyMessageID = -1 ) var ( @@ -60,18 +59,23 @@ var ( ) type ( - // ReplicationTaskProcessorImpl is responsible for processing replication tasks for a shard. - ReplicationTaskProcessorImpl struct { - currentCluster string - sourceCluster string - status int32 - shard shard.Context - historyEngine engine.Engine - historySerializer persistence.PayloadSerializer - config *config.Config - metricsClient metrics.Client - logger log.Logger - replicationTaskExecutor replicationTaskExecutor + // TaskProcessor is responsible for processing replication tasks for a shard. + TaskProcessor interface { + common.Daemon + } + + // taskProcessorImpl is responsible for processing replication tasks for a shard. + taskProcessorImpl struct { + currentCluster string + sourceCluster string + status int32 + shard shard.Context + historyEngine engine.Engine + historySerializer persistence.PayloadSerializer + config *config.Config + metricsClient metrics.Client + logger log.Logger + taskExecutor TaskExecutor taskRetryPolicy backoff.RetryPolicy dlqRetryPolicy backoff.RetryPolicy @@ -85,26 +89,23 @@ type ( done chan struct{} } - // ReplicationTaskProcessor is responsible for processing replication tasks for a shard. - ReplicationTaskProcessor interface { - common.Daemon - } - request struct { token *r.ReplicationToken respChan chan<- *r.ReplicationMessages } ) -// NewReplicationTaskProcessor creates a new replication task processor. -func NewReplicationTaskProcessor( +var _ TaskProcessor = (*taskProcessorImpl)(nil) + +// NewTaskProcessor creates a new replication task processor. +func NewTaskProcessor( shard shard.Context, historyEngine engine.Engine, config *config.Config, metricsClient metrics.Client, - replicationTaskFetcher ReplicationTaskFetcher, - replicationTaskExecutor replicationTaskExecutor, -) *ReplicationTaskProcessorImpl { + taskFetcher TaskFetcher, + taskExecutor TaskExecutor, +) TaskProcessor { taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait()) taskRetryPolicy.SetBackoffCoefficient(taskErrorRetryBackoffCoefficient) taskRetryPolicy.SetMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts()) @@ -116,29 +117,29 @@ func NewReplicationTaskProcessor( noTaskBackoffPolicy.SetBackoffCoefficient(1) noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, backoff.SystemClock) - return &ReplicationTaskProcessorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: replicationTaskFetcher.GetSourceCluster(), - status: common.DaemonStatusInitialized, - shard: shard, - historyEngine: historyEngine, - historySerializer: persistence.NewPayloadSerializer(), - config: config, - metricsClient: metricsClient, - logger: shard.GetLogger(), - replicationTaskExecutor: replicationTaskExecutor, - taskRetryPolicy: taskRetryPolicy, - noTaskRetrier: noTaskRetrier, - requestChan: replicationTaskFetcher.GetRequestChan(), - syncShardChan: make(chan *r.SyncShardStatus), - done: make(chan struct{}), - lastProcessedMessageID: emptyMessageID, - lastRetrievedMessageID: emptyMessageID, + return &taskProcessorImpl{ + currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + sourceCluster: taskFetcher.GetSourceCluster(), + status: common.DaemonStatusInitialized, + shard: shard, + historyEngine: historyEngine, + historySerializer: persistence.NewPayloadSerializer(), + config: config, + metricsClient: metricsClient, + logger: shard.GetLogger(), + taskExecutor: taskExecutor, + taskRetryPolicy: taskRetryPolicy, + noTaskRetrier: noTaskRetrier, + requestChan: taskFetcher.GetRequestChan(), + syncShardChan: make(chan *r.SyncShardStatus), + done: make(chan struct{}), + lastProcessedMessageID: common.EmptyMessageID, + lastRetrievedMessageID: common.EmptyMessageID, } } // Start starts the processor -func (p *ReplicationTaskProcessorImpl) Start() { +func (p *taskProcessorImpl) Start() { if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } @@ -150,7 +151,7 @@ func (p *ReplicationTaskProcessorImpl) Start() { } // Stop stops the processor -func (p *ReplicationTaskProcessorImpl) Stop() { +func (p *taskProcessorImpl) Stop() { if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } @@ -159,7 +160,7 @@ func (p *ReplicationTaskProcessorImpl) Stop() { close(p.done) } -func (p *ReplicationTaskProcessorImpl) processorLoop() { +func (p *taskProcessorImpl) processorLoop() { p.lastProcessedMessageID = p.shard.GetClusterReplicationLevel(p.sourceCluster) defer func() { @@ -198,7 +199,7 @@ Loop: } } -func (p *ReplicationTaskProcessorImpl) cleanupReplicationTaskLoop() { +func (p *taskProcessorImpl) cleanupReplicationTaskLoop() { timer := time.NewTimer(backoff.JitDuration( p.config.ReplicationTaskProcessorCleanupInterval(), @@ -223,7 +224,7 @@ func (p *ReplicationTaskProcessorImpl) cleanupReplicationTaskLoop() { } } -func (p *ReplicationTaskProcessorImpl) cleanupAckedReplicationTasks() error { +func (p *taskProcessorImpl) cleanupAckedReplicationTasks() error { clusterMetadata := p.shard.GetClusterMetadata() currentCluster := clusterMetadata.GetCurrentClusterName() @@ -256,7 +257,7 @@ func (p *ReplicationTaskProcessorImpl) cleanupAckedReplicationTasks() error { ) } -func (p *ReplicationTaskProcessorImpl) sendFetchMessageRequest() <-chan *r.ReplicationMessages { +func (p *taskProcessorImpl) sendFetchMessageRequest() <-chan *r.ReplicationMessages { respChan := make(chan *r.ReplicationMessages, 1) // TODO: when we support prefetching, LastRetrievedMessageId can be different than LastProcessedMessageId p.requestChan <- &request{ @@ -270,7 +271,7 @@ func (p *ReplicationTaskProcessorImpl) sendFetchMessageRequest() <-chan *r.Repli return respChan } -func (p *ReplicationTaskProcessorImpl) processResponse(response *r.ReplicationMessages) { +func (p *taskProcessorImpl) processResponse(response *r.ReplicationMessages) { p.syncShardChan <- response.GetSyncShardStatus() // Note here we check replication tasks instead of hasMore. The expectation is that in a steady state @@ -297,7 +298,7 @@ func (p *ReplicationTaskProcessorImpl) processResponse(response *r.ReplicationMe p.noTaskRetrier.Reset() } -func (p *ReplicationTaskProcessorImpl) syncShardStatusLoop() { +func (p *taskProcessorImpl) syncShardStatusLoop() { timer := time.NewTimer(backoff.JitDuration( p.config.ShardSyncMinInterval(), @@ -326,7 +327,7 @@ func (p *ReplicationTaskProcessorImpl) syncShardStatusLoop() { } } -func (p *ReplicationTaskProcessorImpl) handleSyncShardStatus( +func (p *taskProcessorImpl) handleSyncShardStatus( status *r.SyncShardStatus, ) error { @@ -345,7 +346,7 @@ func (p *ReplicationTaskProcessorImpl) handleSyncShardStatus( }) } -func (p *ReplicationTaskProcessorImpl) processSingleTask(replicationTask *r.ReplicationTask) error { +func (p *taskProcessorImpl) processSingleTask(replicationTask *r.ReplicationTask) error { err := backoff.Retry(func() error { return p.processTaskOnce(replicationTask) }, p.taskRetryPolicy, isTransientRetryableError) @@ -363,8 +364,8 @@ func (p *ReplicationTaskProcessorImpl) processSingleTask(replicationTask *r.Repl return nil } -func (p *ReplicationTaskProcessorImpl) processTaskOnce(replicationTask *r.ReplicationTask) error { - scope, err := p.replicationTaskExecutor.execute( +func (p *taskProcessorImpl) processTaskOnce(replicationTask *r.ReplicationTask) error { + scope, err := p.taskExecutor.execute( p.sourceCluster, replicationTask, false) @@ -382,7 +383,7 @@ func (p *ReplicationTaskProcessorImpl) processTaskOnce(replicationTask *r.Replic return err } -func (p *ReplicationTaskProcessorImpl) putReplicationTaskToDLQ(replicationTask *r.ReplicationTask) error { +func (p *taskProcessorImpl) putReplicationTaskToDLQ(replicationTask *r.ReplicationTask) error { request, err := p.generateDLQRequest(replicationTask) if err != nil { p.logger.Error("Failed to generate DLQ replication task.", tag.Error(err)) @@ -415,7 +416,7 @@ func (p *ReplicationTaskProcessorImpl) putReplicationTaskToDLQ(replicationTask * }, p.dlqRetryPolicy, p.shouldRetryDLQ) } -func (p *ReplicationTaskProcessorImpl) generateDLQRequest( +func (p *taskProcessorImpl) generateDLQRequest( replicationTask *r.ReplicationTask, ) (*persistence.PutReplicationTaskToDLQRequest, error) { switch *replicationTask.TaskType { @@ -491,7 +492,7 @@ func isTransientRetryableError(err error) bool { } } -func (p *ReplicationTaskProcessorImpl) shouldRetryDLQ(err error) bool { +func (p *taskProcessorImpl) shouldRetryDLQ(err error) bool { if err == nil { return false } @@ -519,7 +520,7 @@ func toPersistenceReplicationInfo( return replicationInfoMap } -func (p *ReplicationTaskProcessorImpl) updateFailureMetric(scope int, err error) { +func (p *taskProcessorImpl) updateFailureMetric(scope int, err error) { // Always update failure counter for all replicator errors p.metricsClient.IncCounter(scope, metrics.ReplicatorFailures) diff --git a/service/history/replicationTaskProcessor_mock.go b/service/history/replication/task_processor_mock.go similarity index 60% rename from service/history/replicationTaskProcessor_mock.go rename to service/history/replication/task_processor_mock.go index dd9349de4d3..d8e2b8fa10a 100644 --- a/service/history/replicationTaskProcessor_mock.go +++ b/service/history/replication/task_processor_mock.go @@ -22,10 +22,10 @@ // // Code generated by MockGen. DO NOT EDIT. -// Source: replicationTaskProcessor.go +// Source: task_processor.go -// Package history is a generated GoMock package. -package history +// Package replication is a generated GoMock package. +package replication import ( reflect "reflect" @@ -33,49 +33,49 @@ import ( gomock "github.com/golang/mock/gomock" ) -// MockReplicationTaskProcessor is a mock of ReplicationTaskProcessor interface -type MockReplicationTaskProcessor struct { +// MockTaskProcessor is a mock of TaskProcessor interface +type MockTaskProcessor struct { ctrl *gomock.Controller - recorder *MockReplicationTaskProcessorMockRecorder + recorder *MockTaskProcessorMockRecorder } -// MockReplicationTaskProcessorMockRecorder is the mock recorder for MockReplicationTaskProcessor -type MockReplicationTaskProcessorMockRecorder struct { - mock *MockReplicationTaskProcessor +// MockTaskProcessorMockRecorder is the mock recorder for MockTaskProcessor +type MockTaskProcessorMockRecorder struct { + mock *MockTaskProcessor } -// NewMockReplicationTaskProcessor creates a new mock instance -func NewMockReplicationTaskProcessor(ctrl *gomock.Controller) *MockReplicationTaskProcessor { - mock := &MockReplicationTaskProcessor{ctrl: ctrl} - mock.recorder = &MockReplicationTaskProcessorMockRecorder{mock} +// NewMockTaskProcessor creates a new mock instance +func NewMockTaskProcessor(ctrl *gomock.Controller) *MockTaskProcessor { + mock := &MockTaskProcessor{ctrl: ctrl} + mock.recorder = &MockTaskProcessorMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use -func (m *MockReplicationTaskProcessor) EXPECT() *MockReplicationTaskProcessorMockRecorder { +func (m *MockTaskProcessor) EXPECT() *MockTaskProcessorMockRecorder { return m.recorder } // Start mocks base method -func (m *MockReplicationTaskProcessor) Start() { +func (m *MockTaskProcessor) Start() { m.ctrl.T.Helper() m.ctrl.Call(m, "Start") } // Start indicates an expected call of Start -func (mr *MockReplicationTaskProcessorMockRecorder) Start() *gomock.Call { +func (mr *MockTaskProcessorMockRecorder) Start() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockReplicationTaskProcessor)(nil).Start)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTaskProcessor)(nil).Start)) } // Stop mocks base method -func (m *MockReplicationTaskProcessor) Stop() { +func (m *MockTaskProcessor) Stop() { m.ctrl.T.Helper() m.ctrl.Call(m, "Stop") } // Stop indicates an expected call of Stop -func (mr *MockReplicationTaskProcessorMockRecorder) Stop() *gomock.Call { +func (mr *MockTaskProcessorMockRecorder) Stop() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockReplicationTaskProcessor)(nil).Stop)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTaskProcessor)(nil).Stop)) } diff --git a/service/history/replicationTaskProcessor_test.go b/service/history/replication/task_processor_test.go similarity index 73% rename from service/history/replicationTaskProcessor_test.go rename to service/history/replication/task_processor_test.go index b78ec9edb35..1d99f893861 100644 --- a/service/history/replicationTaskProcessor_test.go +++ b/service/history/replication/task_processor_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package history +package replication import ( "testing" @@ -48,42 +48,42 @@ import ( ) type ( - replicationTaskProcessorSuite struct { + taskProcessorSuite struct { suite.Suite *require.Assertions controller *gomock.Controller - mockShard *shard.TestContext - mockEngine *engine.MockEngine - config *config.Config - historyClient *historyservicetest.MockClient - replicationTaskFetcher *MockReplicationTaskFetcher - mockDomainCache *cache.MockDomainCache - mockClientBean *client.MockBean - adminClient *adminservicetest.MockClient - clusterMetadata *cluster.MockMetadata - executionManager *mocks.ExecutionManager - requestChan chan *request - replicationTaskExecutor *MockreplicationTaskExecutor - - replicationTaskProcessor *ReplicationTaskProcessorImpl + mockShard *shard.TestContext + mockEngine *engine.MockEngine + config *config.Config + historyClient *historyservicetest.MockClient + taskFetcher *MockTaskFetcher + mockDomainCache *cache.MockDomainCache + mockClientBean *client.MockBean + adminClient *adminservicetest.MockClient + clusterMetadata *cluster.MockMetadata + executionManager *mocks.ExecutionManager + requestChan chan *request + taskExecutor *MockTaskExecutor + + taskProcessor *taskProcessorImpl } ) -func TestReplicationTaskProcessorSuite(t *testing.T) { - s := new(replicationTaskProcessorSuite) +func TestTaskProcessorSuite(t *testing.T) { + s := new(taskProcessorSuite) suite.Run(t, s) } -func (s *replicationTaskProcessorSuite) SetupSuite() { +func (s *taskProcessorSuite) SetupSuite() { } -func (s *replicationTaskProcessorSuite) TearDownSuite() { +func (s *taskProcessorSuite) TearDownSuite() { } -func (s *replicationTaskProcessorSuite) SetupTest() { +func (s *taskProcessorSuite) SetupTest() { s.Assertions = require.New(s.T()) s.controller = gomock.NewController(s.T()) @@ -102,7 +102,7 @@ func (s *replicationTaskProcessorSuite) SetupTest() { s.adminClient = s.mockShard.Resource.RemoteAdminClient s.clusterMetadata = s.mockShard.Resource.ClusterMetadata s.executionManager = s.mockShard.Resource.ExecutionMgr - s.replicationTaskExecutor = NewMockreplicationTaskExecutor(s.controller) + s.taskExecutor = NewMockTaskExecutor(s.controller) s.mockEngine = engine.NewMockEngine(s.controller) s.config = config.NewForTest() @@ -110,29 +110,29 @@ func (s *replicationTaskProcessorSuite) SetupTest() { metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) s.requestChan = make(chan *request, 10) - s.replicationTaskFetcher = NewMockReplicationTaskFetcher(s.controller) + s.taskFetcher = NewMockTaskFetcher(s.controller) - s.replicationTaskFetcher.EXPECT().GetSourceCluster().Return("standby").AnyTimes() - s.replicationTaskFetcher.EXPECT().GetRequestChan().Return(s.requestChan).AnyTimes() + s.taskFetcher.EXPECT().GetSourceCluster().Return("standby").AnyTimes() + s.taskFetcher.EXPECT().GetRequestChan().Return(s.requestChan).AnyTimes() s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("active").AnyTimes() - s.replicationTaskProcessor = NewReplicationTaskProcessor( + s.taskProcessor = NewTaskProcessor( s.mockShard, s.mockEngine, s.config, metricsClient, - s.replicationTaskFetcher, - s.replicationTaskExecutor, - ) + s.taskFetcher, + s.taskExecutor, + ).(*taskProcessorImpl) } -func (s *replicationTaskProcessorSuite) TearDownTest() { +func (s *taskProcessorSuite) TearDownTest() { s.controller.Finish() s.mockShard.Finish(s.T()) } -func (s *replicationTaskProcessorSuite) TestSendFetchMessageRequest() { - s.replicationTaskProcessor.sendFetchMessageRequest() +func (s *taskProcessorSuite) TestSendFetchMessageRequest() { + s.taskProcessor.sendFetchMessageRequest() requestMessage := <-s.requestChan s.Equal(int32(0), requestMessage.token.GetShardID()) @@ -140,7 +140,7 @@ func (s *replicationTaskProcessorSuite) TestSendFetchMessageRequest() { s.Equal(int64(-1), requestMessage.token.GetLastRetrievedMessageId()) } -func (s *replicationTaskProcessorSuite) TestHandleSyncShardStatus() { +func (s *taskProcessorSuite) TestHandleSyncShardStatus() { now := time.Now() s.mockEngine.EXPECT().SyncShardStatus(gomock.Any(), &history.SyncShardStatusRequest{ SourceCluster: common.StringPtr("standby"), @@ -148,13 +148,13 @@ func (s *replicationTaskProcessorSuite) TestHandleSyncShardStatus() { Timestamp: common.Int64Ptr(now.UnixNano()), }).Return(nil).Times(1) - err := s.replicationTaskProcessor.handleSyncShardStatus(&replicator.SyncShardStatus{ + err := s.taskProcessor.handleSyncShardStatus(&replicator.SyncShardStatus{ Timestamp: common.Int64Ptr(now.UnixNano()), }) s.NoError(err) } -func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_SyncActivityReplicationTask() { +func (s *taskProcessorSuite) TestPutReplicationTaskToDLQ_SyncActivityReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -176,11 +176,11 @@ func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_SyncActivity }, } s.executionManager.On("PutReplicationTaskToDLQ", request).Return(nil) - err := s.replicationTaskProcessor.putReplicationTaskToDLQ(task) + err := s.taskProcessor.putReplicationTaskToDLQ(task) s.NoError(err) } -func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryReplicationTask() { +func (s *taskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -203,11 +203,11 @@ func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryRepli }, } s.executionManager.On("PutReplicationTaskToDLQ", request).Return(nil) - err := s.replicationTaskProcessor.putReplicationTaskToDLQ(task) + err := s.taskProcessor.putReplicationTaskToDLQ(task) s.NoError(err) } -func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryV2ReplicationTask() { +func (s *taskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryV2ReplicationTask() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() @@ -245,6 +245,6 @@ func (s *replicationTaskProcessorSuite) TestPutReplicationTaskToDLQ_HistoryV2Rep }, } s.executionManager.On("PutReplicationTaskToDLQ", request).Return(nil) - err = s.replicationTaskProcessor.putReplicationTaskToDLQ(task) + err = s.taskProcessor.putReplicationTaskToDLQ(task) s.NoError(err) } diff --git a/service/history/replicationDLQHandler_mock.go b/service/history/replicationDLQHandler_mock.go deleted file mode 100644 index 9af98215b38..00000000000 --- a/service/history/replicationDLQHandler_mock.go +++ /dev/null @@ -1,105 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. -// - -// Code generated by MockGen. DO NOT EDIT. -// Source: replicationDLQHandler.go - -// Package history is a generated GoMock package. -package history - -import ( - context "context" - reflect "reflect" - - gomock "github.com/golang/mock/gomock" - - replicator "github.com/uber/cadence/.gen/go/replicator" -) - -// MockreplicationDLQHandler is a mock of replicationDLQHandler interface -type MockreplicationDLQHandler struct { - ctrl *gomock.Controller - recorder *MockreplicationDLQHandlerMockRecorder -} - -// MockreplicationDLQHandlerMockRecorder is the mock recorder for MockreplicationDLQHandler -type MockreplicationDLQHandlerMockRecorder struct { - mock *MockreplicationDLQHandler -} - -// NewMockreplicationDLQHandler creates a new mock instance -func NewMockreplicationDLQHandler(ctrl *gomock.Controller) *MockreplicationDLQHandler { - mock := &MockreplicationDLQHandler{ctrl: ctrl} - mock.recorder = &MockreplicationDLQHandlerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockreplicationDLQHandler) EXPECT() *MockreplicationDLQHandlerMockRecorder { - return m.recorder -} - -// readMessages mocks base method -func (m *MockreplicationDLQHandler) readMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]*replicator.ReplicationTask, []byte, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "readMessages", ctx, sourceCluster, lastMessageID, pageSize, pageToken) - ret0, _ := ret[0].([]*replicator.ReplicationTask) - ret1, _ := ret[1].([]byte) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 -} - -// readMessages indicates an expected call of readMessages -func (mr *MockreplicationDLQHandlerMockRecorder) readMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readMessages", reflect.TypeOf((*MockreplicationDLQHandler)(nil).readMessages), ctx, sourceCluster, lastMessageID, pageSize, pageToken) -} - -// purgeMessages mocks base method -func (m *MockreplicationDLQHandler) purgeMessages(sourceCluster string, lastMessageID int64) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "purgeMessages", sourceCluster, lastMessageID) - ret0, _ := ret[0].(error) - return ret0 -} - -// purgeMessages indicates an expected call of purgeMessages -func (mr *MockreplicationDLQHandlerMockRecorder) purgeMessages(sourceCluster, lastMessageID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "purgeMessages", reflect.TypeOf((*MockreplicationDLQHandler)(nil).purgeMessages), sourceCluster, lastMessageID) -} - -// mergeMessages mocks base method -func (m *MockreplicationDLQHandler) mergeMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]byte, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "mergeMessages", ctx, sourceCluster, lastMessageID, pageSize, pageToken) - ret0, _ := ret[0].([]byte) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// mergeMessages indicates an expected call of mergeMessages -func (mr *MockreplicationDLQHandlerMockRecorder) mergeMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mergeMessages", reflect.TypeOf((*MockreplicationDLQHandler)(nil).mergeMessages), ctx, sourceCluster, lastMessageID, pageSize, pageToken) -} diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index 71636735fb4..7426f5d49d6 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//TODO move this file to replication subfolder + package history import ( @@ -446,7 +448,7 @@ func (p *replicatorQueueProcessorImpl) getTasks( lastReadTaskID int64, ) (*replicator.ReplicationMessages, error) { - if lastReadTaskID == emptyMessageID { + if lastReadTaskID == common.EmptyMessageID { lastReadTaskID = p.shard.GetClusterReplicationLevel(pollingCluster) }