Skip to content

Commit

Permalink
Fix metadata replication task with NDC (#3218)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Apr 26, 2020
1 parent a8f7c49 commit 72f3792
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 29 deletions.
52 changes: 48 additions & 4 deletions .gen/go/replicator/replicator.go

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion idls
Submodule idls updated from fb394b to 003757
50 changes: 37 additions & 13 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,27 +212,51 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(
}

err = p.replicator.Publish(replicationTask)
if err == messaging.ErrMessageSizeLimit && replicationTask.HistoryTaskAttributes != nil {
if err == messaging.ErrMessageSizeLimit {
// message size exceeds the server messaging size limit
// for this specific case, just send out a metadata message and
// let receiver fetch from source (for the concrete history events)
err = p.replicator.Publish(p.generateHistoryMetadataTask(replicationTask.HistoryTaskAttributes.TargetClusters, task))
if metadataTask := p.generateHistoryMetadataTask(
task,
replicationTask,
); metadataTask != nil {
err = p.replicator.Publish(metadataTask)
}
}
return err
}

func (p *replicatorQueueProcessorImpl) generateHistoryMetadataTask(targetClusters []string, task *persistence.ReplicationTaskInfo) *replicator.ReplicationTask {
return &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskTypeHistoryMetadata.Ptr(),
HistoryMetadataTaskAttributes: &replicator.HistoryMetadataTaskAttributes{
TargetClusters: targetClusters,
DomainId: common.StringPtr(task.DomainID),
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
FirstEventId: common.Int64Ptr(task.FirstEventID),
NextEventId: common.Int64Ptr(task.NextEventID),
},
func (p *replicatorQueueProcessorImpl) generateHistoryMetadataTask(
task *persistence.ReplicationTaskInfo,
replicationTask *replicator.ReplicationTask,
) *replicator.ReplicationTask {

if replicationTask.HistoryTaskAttributes != nil {
return &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskTypeHistoryMetadata.Ptr(),
HistoryMetadataTaskAttributes: &replicator.HistoryMetadataTaskAttributes{
TargetClusters: replicationTask.HistoryTaskAttributes.TargetClusters,
DomainId: common.StringPtr(task.DomainID),
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
FirstEventId: common.Int64Ptr(task.FirstEventID),
NextEventId: common.Int64Ptr(task.NextEventID),
},
}
} else if replicationTask.HistoryTaskV2Attributes != nil {
return &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskTypeHistoryMetadata.Ptr(),
HistoryMetadataTaskAttributes: &replicator.HistoryMetadataTaskAttributes{
DomainId: common.StringPtr(task.DomainID),
WorkflowId: common.StringPtr(task.WorkflowID),
RunId: common.StringPtr(task.RunID),
FirstEventId: common.Int64Ptr(task.FirstEventID),
NextEventId: common.Int64Ptr(task.NextEventID),
Version: common.Int64Ptr(task.Version),
},
}
}
return nil
}

// GenerateReplicationTask generate replication task
Expand Down
1 change: 1 addition & 0 deletions service/worker/replicator/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func (p *replicationTaskProcessor) handleHistoryMetadataReplicationTask(
p.historyClient,
p.metricsClient,
p.historyRereplicator,
p.nDCHistoryResender,
)
return p.sequentialTaskProcessor.Submit(historyMetadataReplicationTask)
}
Expand Down
20 changes: 20 additions & 0 deletions service/worker/replicator/replicationTask.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ type (
sourceCluster string
firstEventID int64
nextEventID int64
version *int64
historyRereplicator xdc.HistoryRereplicator
nDCHistoryResender xdc.NDCHistoryResender
}

historyReplicationV2Task struct {
Expand Down Expand Up @@ -218,6 +220,7 @@ func newHistoryMetadataReplicationTask(
historyClient history.Client,
metricsClient metrics.Client,
historyRereplicator xdc.HistoryRereplicator,
nDCHistoryResender xdc.NDCHistoryResender,
) *historyMetadataReplicationTask {

attr := replicationTask.HistoryMetadataTaskAttributes
Expand All @@ -226,6 +229,11 @@ func newHistoryMetadataReplicationTask(
tag.WorkflowRunID(attr.GetRunId()),
tag.WorkflowFirstEventID(attr.GetFirstEventId()),
tag.WorkflowNextEventID(attr.GetNextEventId()))
var version *int64
if attr.IsSetVersion() {
version = attr.Version
}

return &historyMetadataReplicationTask{
workflowReplicationTask: workflowReplicationTask{
metricsScope: metrics.HistoryMetadataReplicationTaskScope,
Expand All @@ -246,7 +254,9 @@ func newHistoryMetadataReplicationTask(
sourceCluster: sourceCluster,
firstEventID: attr.GetFirstEventId(),
nextEventID: attr.GetNextEventId(),
version: version,
historyRereplicator: historyRereplicator,
nDCHistoryResender: nDCHistoryResender,
}
}

Expand Down Expand Up @@ -410,6 +420,16 @@ func (t *historyMetadataReplicationTask) Execute() error {
stopwatch := t.metricsClient.StartTimer(metrics.HistoryRereplicationByHistoryMetadataReplicationScope, metrics.CadenceClientLatency)
defer stopwatch.Stop()

if t.version != nil {
return t.nDCHistoryResender.SendSingleWorkflowHistory(
t.queueID.DomainID,
t.queueID.WorkflowID,
t.queueID.RunID,
common.Int64Ptr(t.firstEventID-1), //NDC resend API is exclusive-exclusive.
t.version,
common.Int64Ptr(t.nextEventID),
t.version)
}
return t.historyRereplicator.SendMultiWorkflowHistory(
t.queueID.DomainID, t.queueID.WorkflowID,
t.queueID.RunID, t.firstEventID,
Expand Down
22 changes: 13 additions & 9 deletions service/worker/replicator/replicationTask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type (
mockMsg *messageMocks.Message
mockHistoryClient *historyservicetest.MockClient
mockRereplicator *xdc.MockHistoryRereplicator
mockNDCResender *xdc.MockNDCHistoryResender

controller *gomock.Controller
}
Expand Down Expand Up @@ -201,6 +202,7 @@ func (s *historyMetadataReplicationTaskSuite) SetupTest() {
s.controller = gomock.NewController(s.T())
s.mockHistoryClient = historyservicetest.NewMockClient(s.controller)
s.mockRereplicator = &xdc.MockHistoryRereplicator{}
s.mockNDCResender = &xdc.MockNDCHistoryResender{}
}

func (s *historyMetadataReplicationTaskSuite) TearDownTest() {
Expand Down Expand Up @@ -635,6 +637,7 @@ func (s *historyMetadataReplicationTaskSuite) TestNewHistoryMetadataReplicationT
s.mockHistoryClient,
s.metricsClient,
s.mockRereplicator,
s.mockNDCResender,
)
// overwrite the logger for easy comparison
metadataTask.logger = s.logger
Expand Down Expand Up @@ -663,14 +666,15 @@ func (s *historyMetadataReplicationTaskSuite) TestNewHistoryMetadataReplicationT
firstEventID: replicationAttr.GetFirstEventId(),
nextEventID: replicationAttr.GetNextEventId(),
historyRereplicator: s.mockRereplicator,
nDCHistoryResender: s.mockNDCResender,
},
metadataTask,
)
}

func (s *historyMetadataReplicationTaskSuite) TestExecute() {
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)

randomErr := errors.New("some random error")
s.mockRereplicator.On("SendMultiWorkflowHistory",
Expand All @@ -685,7 +689,7 @@ func (s *historyMetadataReplicationTaskSuite) TestExecute() {

func (s *historyMetadataReplicationTaskSuite) TestHandleErr_NotRetryErr() {
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
randomErr := errors.New("some random error")

err := task.HandleErr(randomErr)
Expand All @@ -694,7 +698,7 @@ func (s *historyMetadataReplicationTaskSuite) TestHandleErr_NotRetryErr() {

func (s *historyMetadataReplicationTaskSuite) TestHandleErr_RetryErr() {
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
retryErr := &shared.RetryTaskError{
DomainId: common.StringPtr(task.queueID.DomainID),
WorkflowId: common.StringPtr(task.queueID.WorkflowID),
Expand Down Expand Up @@ -727,45 +731,45 @@ func (s *historyMetadataReplicationTaskSuite) TestHandleErr_RetryErr() {
func (s *historyMetadataReplicationTaskSuite) TestRetryErr_NonRetryable() {
err := &shared.BadRequestError{}
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
s.False(task.RetryErr(err))
}

func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable() {
err := &shared.InternalServiceError{}
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
task.attempt = 0
s.True(task.RetryErr(err))
}

func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable_ExceedAttempt() {
err := &shared.InternalServiceError{}
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
task.attempt = s.config.ReplicationTaskMaxRetryCount() + 100
s.False(task.RetryErr(err))
}

func (s *historyMetadataReplicationTaskSuite) TestRetryErr_Retryable_ExceedDuration() {
err := &shared.InternalServiceError{}
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)
task.startTime = s.mockTimeSource.Now().Add(-2 * s.config.ReplicationTaskMaxRetryDuration())
s.False(task.RetryErr(err))
}

func (s *historyMetadataReplicationTaskSuite) TestAck() {
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)

s.mockMsg.On("Ack").Return(nil).Once()
task.Ack()
}

func (s *historyMetadataReplicationTaskSuite) TestNack() {
task := newHistoryMetadataReplicationTask(s.getHistoryMetadataReplicationTask(), s.mockMsg, s.sourceCluster, s.logger,
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator)
s.config, s.mockTimeSource, s.mockHistoryClient, s.metricsClient, s.mockRereplicator, s.mockNDCResender)

s.mockMsg.On("Nack").Return(nil).Once()
task.Nack()
Expand Down

0 comments on commit 72f3792

Please sign in to comment.