From c444cac9082c7f9a85f5a7a887d7c2c7f97e5f03 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Tue, 27 Mar 2018 12:57:55 -0700 Subject: [PATCH] Replication task processor bugfixes (#635) Use mutableStateBuilder.GetNextEventID rather than managing event ID on the history builder for generation of replication tasks. Managing the next event ID on history builder does not account for buffered events and will cause incorrect next event ID set on the replication task. Fixed missing metric scope definition for replication queue processor. Use GetLastFirstEventID instead of managing custom state in history builder --- common/metrics/defs.go | 2 ++ service/history/historyBuilder.go | 20 ++++++++------------ service/history/historyEngine.go | 4 ++-- service/history/mutableStateBuilder.go | 10 +++++----- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 82a01d447dc..f109a40fcba 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -544,6 +544,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ TimerTaskWorkflowTimeoutScope: {operation: "TimerTaskWorkflowTimeout"}, TimerTaskDeleteHistoryEvent: {operation: "TimerTaskDeleteHistoryEvent"}, HistoryEventNotificationScope: {operation: "HistoryEventNotification"}, + ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"}, + ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"}, }, // Matching Scope Names Matching: { diff --git a/service/history/historyBuilder.go b/service/history/historyBuilder.go index d2f396a65c6..c976ff8a3eb 100644 --- a/service/history/historyBuilder.go +++ b/service/history/historyBuilder.go @@ -37,22 +37,19 @@ const ( type ( historyBuilder struct { - firstEventID int64 - nextEventID int64 - serializer persistence.HistorySerializer - history []*workflow.HistoryEvent - msBuilder *mutableStateBuilder - logger bark.Logger + serializer persistence.HistorySerializer + history []*workflow.HistoryEvent + msBuilder *mutableStateBuilder + logger bark.Logger } ) func newHistoryBuilder(msBuilder *mutableStateBuilder, logger bark.Logger) *historyBuilder { return &historyBuilder{ - firstEventID: msBuilder.GetNextEventID(), - serializer: persistence.NewJSONHistorySerializer(), - history: []*workflow.HistoryEvent{}, - msBuilder: msBuilder, - logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryBuilderComponent), + serializer: persistence.NewJSONHistorySerializer(), + history: []*workflow.HistoryEvent{}, + msBuilder: msBuilder, + logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryBuilderComponent), } } @@ -419,7 +416,6 @@ func (b *historyBuilder) AddChildWorkflowExecutionTimedOutEvent(domain *string, func (b *historyBuilder) addEventToHistory(event *workflow.HistoryEvent) *workflow.HistoryEvent { b.history = append(b.history, event) - b.nextEventID = b.msBuilder.GetNextEventID() // Keep track of nextEventID for generating replication task return event } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 5c946f504bc..dfda1c39f8e 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -288,8 +288,8 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow } replicationTask := &persistence.HistoryReplicationTask{ - FirstEventID: msBuilder.hBuilder.firstEventID, - NextEventID: msBuilder.hBuilder.nextEventID, + FirstEventID: firstEventID, + NextEventID: msBuilder.GetNextEventID(), Version: failoverVersion, LastReplicationInfo: nil, } diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index 3aedb2f47e8..8903f5ec064 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -234,7 +234,7 @@ func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion int64 e.replicationState.CurrentVersion = failoverVersion e.replicationState.LastWriteVersion = failoverVersion // TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base - e.replicationState.LastWriteEventID = e.hBuilder.nextEventID - 1 + e.replicationState.LastWriteEventID = e.GetNextEventID() - 1 } func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*mutableStateSessionUpdates, error) { @@ -291,8 +291,8 @@ func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*m func (e *mutableStateBuilder) createReplicationTask() *persistence.HistoryReplicationTask { return &persistence.HistoryReplicationTask{ - FirstEventID: e.hBuilder.firstEventID, - NextEventID: e.hBuilder.nextEventID, + FirstEventID: e.GetLastFirstEventID(), + NextEventID: e.GetNextEventID(), Version: e.replicationState.CurrentVersion, LastReplicationInfo: e.replicationState.LastReplicationInfo, } @@ -1644,8 +1644,8 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int } replicationTask := &persistence.HistoryReplicationTask{ - FirstEventID: newStateBuilder.hBuilder.firstEventID, - NextEventID: newStateBuilder.hBuilder.nextEventID, + FirstEventID: firstEventID, + NextEventID: newStateBuilder.GetNextEventID(), Version: failoverVersion, LastReplicationInfo: nil, }