Skip to content

Commit

Permalink
Fix history resender source cluster (#3365)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Jun 30, 2020
1 parent 514d765 commit b066ed1
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 69 deletions.
70 changes: 37 additions & 33 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,40 +249,44 @@ func NewEngineWithShardContext(
)
}
historyEngImpl.decisionHandler = newDecisionHandler(historyEngImpl)

nDCHistoryResender := xdc.NewNDCHistoryResender(
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(currentClusterName),
func(ctx context.Context, request *h.ReplicateEventsV2Request) error {
return shard.GetService().GetHistoryClient().ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
shard.GetLogger(),
)
historyRereplicator := xdc.NewHistoryRereplicator(
currentClusterName,
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(currentClusterName),
func(ctx context.Context, request *h.ReplicateRawEventsRequest) error {
return shard.GetService().GetHistoryClient().ReplicateRawEvents(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
replicationTimeout,
nil,
shard.GetLogger(),
)
replicationTaskExecutor := replication.NewTaskExecutor(
currentClusterName,
shard.GetDomainCache(),
nDCHistoryResender,
historyRereplicator,
historyEngImpl,
shard.GetMetricsClient(),
shard.GetLogger(),
)
var replicationTaskProcessors []replication.TaskProcessor
replicationTaskExecutors := make(map[string]replication.TaskExecutor)
for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() {
sourceCluster := replicationTaskFetcher.GetSourceCluster()
nDCHistoryResender := xdc.NewNDCHistoryResender(
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster),
func(ctx context.Context, request *h.ReplicateEventsV2Request) error {
return shard.GetService().GetHistoryClient().ReplicateEventsV2(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
nil,
shard.GetLogger(),
)
historyRereplicator := xdc.NewHistoryRereplicator(
currentClusterName,
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster),
func(ctx context.Context, request *h.ReplicateRawEventsRequest) error {
return shard.GetService().GetHistoryClient().ReplicateRawEvents(ctx, request)
},
shard.GetService().GetPayloadSerializer(),
replicationTimeout,
nil,
shard.GetLogger(),
)
replicationTaskExecutor := replication.NewTaskExecutor(
sourceCluster,
shard,
shard.GetDomainCache(),
nDCHistoryResender,
historyRereplicator,
historyEngImpl,
shard.GetMetricsClient(),
shard.GetLogger(),
)
replicationTaskExecutors[sourceCluster] = replicationTaskExecutor

replicationTaskProcessor := replication.NewTaskProcessor(
shard,
historyEngImpl,
Expand All @@ -294,7 +298,7 @@ func NewEngineWithShardContext(
replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor)
}
historyEngImpl.replicationTaskProcessors = replicationTaskProcessors
replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutor)
replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutors)
historyEngImpl.replicationDLQHandler = replicationMessageHandler

shard.SetEngine(historyEngImpl)
Expand Down
30 changes: 21 additions & 9 deletions service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@ import (
"context"

"github.com/uber/cadence/.gen/go/replicator"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/shard"
)

var (
errInvalidCluster = &workflow.BadRequestError{Message: "Invalid target cluster name."}
)

type (
// DLQHandler is the interface handles replication DLQ messages
DLQHandler interface {
Expand All @@ -57,9 +62,9 @@ type (
}

dlqHandlerImpl struct {
taskExecutor TaskExecutor
shard shard.Context
logger log.Logger
taskExecutors map[string]TaskExecutor
shard shard.Context
logger log.Logger
}
)

Expand All @@ -68,13 +73,17 @@ var _ DLQHandler = (*dlqHandlerImpl)(nil)
// NewDLQHandler initialize the replication message DLQ handler
func NewDLQHandler(
shard shard.Context,
taskExecutor TaskExecutor,
taskExecutors map[string]TaskExecutor,
) DLQHandler {

if taskExecutors == nil {
panic("Failed to initialize replication DLQ handler due to nil task executors")
}

return &dlqHandlerImpl{
shard: shard,
taskExecutor: taskExecutor,
logger: shard.GetLogger(),
shard: shard,
taskExecutors: taskExecutors,
logger: shard.GetLogger(),
}
}

Expand Down Expand Up @@ -184,6 +193,10 @@ func (r *dlqHandlerImpl) MergeMessages(
pageToken []byte,
) ([]byte, error) {

if _, ok := r.taskExecutors[sourceCluster]; !ok {
return nil, errInvalidCluster
}

tasks, ackLevel, token, err := r.readMessagesWithAckLevel(
ctx,
sourceCluster,
Expand All @@ -193,8 +206,7 @@ func (r *dlqHandlerImpl) MergeMessages(
)

for _, task := range tasks {
if _, err := r.taskExecutor.execute(
sourceCluster,
if _, err := r.taskExecutors[sourceCluster].execute(
task,
true,
); err != nil {
Expand Down
25 changes: 14 additions & 11 deletions service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type (
executionManager *mocks.ExecutionManager
shardManager *mocks.ShardManager
taskExecutor *MockTaskExecutor
taskExecutors map[string]TaskExecutor
sourceCluster string

messageHandler *dlqHandlerImpl
}
Expand Down Expand Up @@ -96,11 +98,14 @@ func (s *dlqHandlerSuite) SetupTest() {
s.shardManager = s.mockShard.Resource.ShardMgr

s.clusterMetadata.EXPECT().GetCurrentClusterName().Return("active").AnyTimes()
s.taskExecutors = make(map[string]TaskExecutor)
s.taskExecutor = NewMockTaskExecutor(s.controller)
s.sourceCluster = "test"
s.taskExecutors[s.sourceCluster] = s.taskExecutor

s.messageHandler = NewDLQHandler(
s.mockShard,
s.taskExecutor,
s.taskExecutors,
).(*dlqHandlerImpl)
}

Expand All @@ -111,7 +116,6 @@ func (s *dlqHandlerSuite) TearDownTest() {

func (s *dlqHandlerSuite) TestReadMessages_OK() {
ctx := context.Background()
sourceCluster := "test"
lastMessageID := int64(1)
pageSize := 1
pageToken := []byte{}
Expand All @@ -128,7 +132,7 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() {
},
}
s.executionManager.On("GetReplicationTasksFromDLQ", &persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: -1,
MaxReadLevel: lastMessageID,
Expand All @@ -137,11 +141,11 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() {
},
}).Return(resp, nil).Times(1)

s.mockClientBean.EXPECT().GetRemoteAdminClient(sourceCluster).Return(s.adminClient).AnyTimes()
s.mockClientBean.EXPECT().GetRemoteAdminClient(s.sourceCluster).Return(s.adminClient).AnyTimes()
s.adminClient.EXPECT().
GetDLQReplicationMessages(ctx, gomock.Any()).
Return(&replicator.GetDLQReplicationMessagesResponse{}, nil)
tasks, token, err := s.messageHandler.ReadMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken)
tasks, token, err := s.messageHandler.ReadMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken)
s.NoError(err)
s.Nil(token)
s.Nil(tasks)
Expand All @@ -165,7 +169,6 @@ func (s *dlqHandlerSuite) TestPurgeMessages_OK() {

func (s *dlqHandlerSuite) TestMergeMessages_OK() {
ctx := context.Background()
sourceCluster := "test"
lastMessageID := int64(1)
pageSize := 1
pageToken := []byte{}
Expand All @@ -182,7 +185,7 @@ func (s *dlqHandlerSuite) TestMergeMessages_OK() {
},
}
s.executionManager.On("GetReplicationTasksFromDLQ", &persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: -1,
MaxReadLevel: lastMessageID,
Expand All @@ -191,7 +194,7 @@ func (s *dlqHandlerSuite) TestMergeMessages_OK() {
},
}).Return(resp, nil).Times(1)

s.mockClientBean.EXPECT().GetRemoteAdminClient(sourceCluster).Return(s.adminClient).AnyTimes()
s.mockClientBean.EXPECT().GetRemoteAdminClient(s.sourceCluster).Return(s.adminClient).AnyTimes()
replicationTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskTypeHistory.Ptr(),
SourceTaskId: common.Int64Ptr(lastMessageID),
Expand All @@ -204,17 +207,17 @@ func (s *dlqHandlerSuite) TestMergeMessages_OK() {
replicationTask,
},
}, nil)
s.taskExecutor.EXPECT().execute(sourceCluster, replicationTask, true).Return(0, nil).Times(1)
s.taskExecutor.EXPECT().execute(replicationTask, true).Return(0, nil).Times(1)
s.executionManager.On("RangeDeleteReplicationTaskFromDLQ",
&persistence.RangeDeleteReplicationTaskFromDLQRequest{
SourceClusterName: sourceCluster,
SourceClusterName: s.sourceCluster,
ExclusiveBeginTaskID: -1,
InclusiveEndTaskID: lastMessageID,
}).Return(nil).Times(1)

s.shardManager.On("UpdateShard", mock.Anything).Return(nil)

token, err := s.messageHandler.MergeMessages(ctx, sourceCluster, lastMessageID, pageSize, pageToken)
token, err := s.messageHandler.MergeMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken)
s.NoError(err)
s.Nil(token)
}
18 changes: 11 additions & 7 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,19 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/xdc"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/shard"
)

type (
// TaskExecutor is the executor for replication task
TaskExecutor interface {
execute(sourceCluster string, replicationTask *r.ReplicationTask, forceApply bool) (int, error)
execute(replicationTask *r.ReplicationTask, forceApply bool) (int, error)
}

taskExecutorImpl struct {
currentCluster string
sourceCluster string
shard shard.Context
domainCache cache.DomainCache
nDCHistoryResender xdc.NDCHistoryResender
historyRereplicator xdc.HistoryRereplicator
Expand All @@ -60,7 +63,8 @@ var _ TaskExecutor = (*taskExecutorImpl)(nil)
// NewTaskExecutor creates an replication task executor
// The executor uses by 1) DLQ replication task handler 2) history replication task processor
func NewTaskExecutor(
currentCluster string,
sourceCluster string,
shard shard.Context,
domainCache cache.DomainCache,
nDCHistoryResender xdc.NDCHistoryResender,
historyRereplicator xdc.HistoryRereplicator,
Expand All @@ -69,7 +73,9 @@ func NewTaskExecutor(
logger log.Logger,
) TaskExecutor {
return &taskExecutorImpl{
currentCluster: currentCluster,
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
sourceCluster: sourceCluster,
shard: shard,
domainCache: domainCache,
nDCHistoryResender: nDCHistoryResender,
historyRereplicator: historyRereplicator,
Expand All @@ -80,7 +86,6 @@ func NewTaskExecutor(
}

func (e *taskExecutorImpl) execute(
sourceCluster string,
replicationTask *r.ReplicationTask,
forceApply bool,
) (int, error) {
Expand All @@ -96,7 +101,7 @@ func (e *taskExecutorImpl) execute(
err = e.handleActivityTask(replicationTask, forceApply)
case r.ReplicationTaskTypeHistory:
scope = metrics.HistoryReplicationTaskScope
err = e.handleHistoryReplicationTask(sourceCluster, replicationTask, forceApply)
err = e.handleHistoryReplicationTask(replicationTask, forceApply)
case r.ReplicationTaskTypeHistoryMetadata:
// Without kafka we should not have size limits so we don't necessary need this in the new replication scheme.
scope = metrics.HistoryMetadataReplicationTaskScope
Expand Down Expand Up @@ -195,7 +200,6 @@ func (e *taskExecutorImpl) handleActivityTask(

//TODO: remove this part after 2DC deprecation
func (e *taskExecutorImpl) handleHistoryReplicationTask(
sourceCluster string,
task *r.ReplicationTask,
forceApply bool,
) error {
Expand All @@ -207,7 +211,7 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
}

request := &history.ReplicateEventsRequest{
SourceCluster: common.StringPtr(sourceCluster),
SourceCluster: common.StringPtr(e.sourceCluster),
DomainUUID: attr.DomainId,
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: attr.WorkflowId,
Expand Down
8 changes: 4 additions & 4 deletions service/history/replication/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b066ed1

Please sign in to comment.