Skip to content

Commit

Permalink
Replication task processor bugfixes (#635)
Browse files Browse the repository at this point in the history
Use mutableStateBuilder.GetNextEventID rather than managing event ID on
the history builder for generation of replication tasks.  Managing the
next event ID on history builder does not account for buffered events
and will cause incorrect next event ID set on the replication task.

Fixed missing metric scope definition for replication queue processor.

Use GetLastFirstEventID instead of managing custom state in history builder
  • Loading branch information
samarabbas authored Mar 27, 2018
1 parent 45a8fa1 commit c444cac
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
TimerTaskWorkflowTimeoutScope: {operation: "TimerTaskWorkflowTimeout"},
TimerTaskDeleteHistoryEvent: {operation: "TimerTaskDeleteHistoryEvent"},
HistoryEventNotificationScope: {operation: "HistoryEventNotification"},
ReplicatorQueueProcessorScope: {operation: "ReplicatorQueueProcessor"},
ReplicatorTaskHistoryScope: {operation: "ReplicatorTaskHistory"},
},
// Matching Scope Names
Matching: {
Expand Down
20 changes: 8 additions & 12 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,19 @@ const (

type (
historyBuilder struct {
firstEventID int64
nextEventID int64
serializer persistence.HistorySerializer
history []*workflow.HistoryEvent
msBuilder *mutableStateBuilder
logger bark.Logger
serializer persistence.HistorySerializer
history []*workflow.HistoryEvent
msBuilder *mutableStateBuilder
logger bark.Logger
}
)

func newHistoryBuilder(msBuilder *mutableStateBuilder, logger bark.Logger) *historyBuilder {
return &historyBuilder{
firstEventID: msBuilder.GetNextEventID(),
serializer: persistence.NewJSONHistorySerializer(),
history: []*workflow.HistoryEvent{},
msBuilder: msBuilder,
logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryBuilderComponent),
serializer: persistence.NewJSONHistorySerializer(),
history: []*workflow.HistoryEvent{},
msBuilder: msBuilder,
logger: logger.WithField(logging.TagWorkflowComponent, logging.TagValueHistoryBuilderComponent),
}
}

Expand Down Expand Up @@ -419,7 +416,6 @@ func (b *historyBuilder) AddChildWorkflowExecutionTimedOutEvent(domain *string,

func (b *historyBuilder) addEventToHistory(event *workflow.HistoryEvent) *workflow.HistoryEvent {
b.history = append(b.history, event)
b.nextEventID = b.msBuilder.GetNextEventID() // Keep track of nextEventID for generating replication task
return event
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
}

replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: msBuilder.hBuilder.firstEventID,
NextEventID: msBuilder.hBuilder.nextEventID,
FirstEventID: firstEventID,
NextEventID: msBuilder.GetNextEventID(),
Version: failoverVersion,
LastReplicationInfo: nil,
}
Expand Down
10 changes: 5 additions & 5 deletions service/history/mutableStateBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (e *mutableStateBuilder) ApplyReplicationStateUpdates(failoverVersion int64
e.replicationState.CurrentVersion = failoverVersion
e.replicationState.LastWriteVersion = failoverVersion
// TODO: Rename this to NextEventID to stay consistent naming convention with rest of code base
e.replicationState.LastWriteEventID = e.hBuilder.nextEventID - 1
e.replicationState.LastWriteEventID = e.GetNextEventID() - 1
}

func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*mutableStateSessionUpdates, error) {
Expand Down Expand Up @@ -291,8 +291,8 @@ func (e *mutableStateBuilder) CloseUpdateSession(createReplicationTask bool) (*m

func (e *mutableStateBuilder) createReplicationTask() *persistence.HistoryReplicationTask {
return &persistence.HistoryReplicationTask{
FirstEventID: e.hBuilder.firstEventID,
NextEventID: e.hBuilder.nextEventID,
FirstEventID: e.GetLastFirstEventID(),
NextEventID: e.GetNextEventID(),
Version: e.replicationState.CurrentVersion,
LastReplicationInfo: e.replicationState.LastReplicationInfo,
}
Expand Down Expand Up @@ -1644,8 +1644,8 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int
}

replicationTask := &persistence.HistoryReplicationTask{
FirstEventID: newStateBuilder.hBuilder.firstEventID,
NextEventID: newStateBuilder.hBuilder.nextEventID,
FirstEventID: firstEventID,
NextEventID: newStateBuilder.GetNextEventID(),
Version: failoverVersion,
LastReplicationInfo: nil,
}
Expand Down

0 comments on commit c444cac

Please sign in to comment.