Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle deletion of task during failover #976

Merged
merged 9 commits into from
Jul 24, 2018
16 changes: 16 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,25 @@ type (
TimerAckLevel time.Time
ClusterTransferAckLevel map[string]int64
ClusterTimerAckLevel map[string]time.Time
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
DomainNotificationVersion int64
}

// TransferFailoverLevel contains corresponding start / end level
TransferFailoverLevel struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also include CurrentLevel as a separate field to checkpoint current progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

MinLevel int64
MaxLevel int64
DomainIDs []string
}

// TimerFailoverLevel contains domain IDs and corresponding start / end level
TimerFailoverLevel struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also include CurrentLevel as a separate field to checkpoint current progress.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

MinLevel time.Time
MaxLevel time.Time
DomainIDs []string
}

// WorkflowExecutionInfo describes a workflow execution
WorkflowExecutionInfo struct {
DomainID string
Expand Down
13 changes: 13 additions & 0 deletions service/history/MockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,16 @@ func (_m *MockProcessor) updateAckLevel(taskID int64) error {
}
return r0
}

// queueShutdown is mock implementation for queueShutdown of Processor
func (_m *MockProcessor) queueShutdown() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
1 change: 1 addition & 0 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type (
readTasks(readLevel int64) ([]queueTaskInfo, bool, error)
completeTask(taskID int64) error
updateAckLevel(taskID int64) error
queueShutdown() error
}

transferQueueProcessor interface {
Expand Down
60 changes: 60 additions & 0 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,66 @@ func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return nil
}

// UpdateTransferFailoverLevel test implementation
func (s *TestShardContext) UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TransferFailoverLevels[failoverID] = level
return nil
}

// DeleteTransferFailoverLevel test implementation
func (s *TestShardContext) DeleteTransferFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TransferFailoverLevels, failoverID)
return nil
}

// GetAllTransferFailoverLevels test implementation
func (s *TestShardContext) GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TransferFailoverLevel{}
for k, v := range s.shardInfo.TransferFailoverLevels {
ret[k] = v
}
return ret
}

// UpdateTimerFailoverLevel test implementation
func (s *TestShardContext) UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerFailoverLevels[failoverID] = level
return nil
}

// DeleteTimerFailoverLevel test implementation
func (s *TestShardContext) DeleteTimerFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TimerFailoverLevels, failoverID)
return nil
}

// GetAllTimerFailoverLevels test implementation
func (s *TestShardContext) GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TimerFailoverLevel{}
for k, v := range s.shardInfo.TimerFailoverLevels {
ret[k] = v
}
return ret
}

// GetDomainNotificationVersion test implementation
func (s *TestShardContext) GetDomainNotificationVersion() int64 {
s.RLock()
Expand Down
17 changes: 8 additions & 9 deletions service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,18 @@ MoveAckLevelLoop:
a.ackLevel = ackLevel

if a.isFailover && a.isReadFinished && len(a.outstandingTasks) == 0 {
a.Unlock()
// this means in failover mode, all possible failover transfer tasks
// are processed and we are free to shundown
a.logger.Debugf("Queue ack manager shutdoen.")
a.logger.Debugf("Queue ack manager shutdown.")
a.finishedChan <- struct{}{}
a.processor.queueShutdown()
return
}
a.Unlock()

if !a.isFailover {
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err)
}
} else {
// TODO deal with failover ack level persistence, issue #646
a.Unlock()
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
logging.LogOperationFailedEvent(a.logger, "Error updating ack level for shard", err)
}
}
6 changes: 4 additions & 2 deletions service/history/queueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (s *queueFailoverAckMgrSuite) TearDownTest() {

}

func (s *queueFailoverAckMgrSuite) TestReadTimerTasks() {
func (s *queueFailoverAckMgrSuite) TestReadQueueTasks() {
readLevel := s.queueFailoverAckMgr.readLevel
// when the ack manager is first initialized, read == ack level
s.Equal(s.queueFailoverAckMgr.getQueueAckLevel(), readLevel)
Expand Down Expand Up @@ -410,7 +410,7 @@ func (s *queueFailoverAckMgrSuite) TestReadTimerTasks() {
s.True(s.queueFailoverAckMgr.isReadFinished)
}

func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
func (s *queueFailoverAckMgrSuite) TestReadCompleteQueueTasks() {
readLevel := s.queueFailoverAckMgr.readLevel
// when the ack manager is first initialized, read == ack level
s.Equal(s.queueFailoverAckMgr.getQueueAckLevel(), readLevel)
Expand Down Expand Up @@ -450,6 +450,7 @@ func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
s.mockProcessor.On("completeTask", taskID2).Return(nil).Once()
s.queueFailoverAckMgr.completeQueueTask(taskID2)
s.Equal(map[int64]bool{taskID1: false, taskID2: true}, s.queueFailoverAckMgr.outstandingTasks)
s.mockProcessor.On("updateAckLevel", s.queueFailoverAckMgr.getQueueAckLevel()).Return(nil)
s.queueFailoverAckMgr.updateQueueAckLevel()
select {
case <-s.queueFailoverAckMgr.getFinishedChan():
Expand All @@ -460,6 +461,7 @@ func (s *queueFailoverAckMgrSuite) TestReadCompleteTimerTasks() {
s.mockProcessor.On("completeTask", taskID1).Return(nil).Once()
s.queueFailoverAckMgr.completeQueueTask(taskID1)
s.Equal(map[int64]bool{taskID1: true, taskID2: true}, s.queueFailoverAckMgr.outstandingTasks)
s.mockProcessor.On("queueShutdown").Return(nil)
s.queueFailoverAckMgr.updateQueueAckLevel()
select {
case <-s.queueFailoverAckMgr.getFinishedChan():
Expand Down
11 changes: 6 additions & 5 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ func (p *queueProcessorBase) processorPump() {

jitter := backoff.NewJitter()
pollTimer := time.NewTimer(jitter.JitDuration(p.options.MaxPollInterval(), p.options.MaxPollIntervalJitterCoefficient()))
updateAckTimer := time.NewTimer(p.options.UpdateAckInterval())
defer pollTimer.Stop()

updateAckTicker := time.NewTicker(p.options.UpdateAckInterval())
defer updateAckTicker.Stop()

processorPumpLoop:
for {
Expand All @@ -173,9 +176,8 @@ processorPumpLoop:
if p.lastPollTime.Add(p.options.MaxPollInterval()).Before(time.Now()) {
p.processBatch(tasksCh)
}
case <-updateAckTimer.C:
case <-updateAckTicker.C:
p.ackMgr.updateQueueAckLevel()
updateAckTimer = time.NewTimer(p.options.UpdateAckInterval())
}
}

Expand All @@ -185,8 +187,7 @@ processorPumpLoop:
if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success {
p.logger.Warn("Queue processor timedout on worker shutdown.")
}
updateAckTimer.Stop()
pollTimer.Stop()

}

func (p *queueProcessorBase) processBatch(tasksCh chan<- queueTaskInfo) {
Expand Down
5 changes: 5 additions & 0 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func (p *replicatorQueueProcessorImpl) process(qTask queueTaskInfo) error {
return err
}

func (p *replicatorQueueProcessorImpl) queueShutdown() error {
// there is no shutdown specific behavior for replication queue
return nil
}

func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persistence.ReplicationTaskInfo) error {
p.metricsClient.IncCounter(metrics.ReplicatorTaskHistoryScope, metrics.TaskRequests)
sw := p.metricsClient.StartTimer(metrics.ReplicatorTaskHistoryScope, metrics.TaskLatency)
Expand Down
70 changes: 70 additions & 0 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type (
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error
DeleteTransferFailoverLevel(failoverID string) error
GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel
UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error
DeleteTimerFailoverLevel(failoverID string) error
GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel
GetDomainNotificationVersion() int64
UpdateDomainNotificationVersion(domainNotificationVersion int64) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
Expand Down Expand Up @@ -232,6 +238,60 @@ func (s *shardContextImpl) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) UpdateTransferFailoverLevel(failoverID string, level persistence.TransferFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TransferFailoverLevels[failoverID] = level
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) DeleteTransferFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TransferFailoverLevels, failoverID)
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetAllTransferFailoverLevels() map[string]persistence.TransferFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TransferFailoverLevel{}
for k, v := range s.shardInfo.TransferFailoverLevels {
ret[k] = v
}
return ret
}

func (s *shardContextImpl) UpdateTimerFailoverLevel(failoverID string, level persistence.TimerFailoverLevel) error {
s.Lock()
defer s.Unlock()

s.shardInfo.TimerFailoverLevels[failoverID] = level
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) DeleteTimerFailoverLevel(failoverID string) error {
s.Lock()
defer s.Unlock()

delete(s.shardInfo.TimerFailoverLevels, failoverID)
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetAllTimerFailoverLevels() map[string]persistence.TimerFailoverLevel {
s.RLock()
defer s.RUnlock()

ret := map[string]persistence.TimerFailoverLevel{}
for k, v := range s.shardInfo.TimerFailoverLevels {
ret[k] = v
}
return ret
}

func (s *shardContextImpl) GetDomainNotificationVersion() int64 {
s.RLock()
defer s.RUnlock()
Expand Down Expand Up @@ -756,6 +816,14 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
}

func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo {
transferFailoverLevels := map[string]persistence.TransferFailoverLevel{}
for k, v := range shardInfo.TransferFailoverLevels {
transferFailoverLevels[k] = v
}
timerFailoverLevels := map[string]persistence.TimerFailoverLevel{}
for k, v := range shardInfo.TimerFailoverLevels {
timerFailoverLevels[k] = v
}
clusterTransferAckLevel := make(map[string]int64)
for k, v := range shardInfo.ClusterTransferAckLevel {
clusterTransferAckLevel[k] = v
Expand All @@ -772,6 +840,8 @@ func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo {
ReplicationAckLevel: shardInfo.ReplicationAckLevel,
TransferAckLevel: shardInfo.TransferAckLevel,
TimerAckLevel: shardInfo.TimerAckLevel,
TransferFailoverLevels: transferFailoverLevels,
TimerFailoverLevels: timerFailoverLevels,
ClusterTransferAckLevel: clusterTransferAckLevel,
ClusterTimerAckLevel: clusterTimerAckLevel,
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
Expand Down
8 changes: 8 additions & 0 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func (s *shardControllerSuite) TestAcquireShardSuccess() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -247,6 +249,8 @@ func (s *shardControllerSuite) TestAcquireShardRenewSuccess() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -320,6 +324,8 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: 5,
}).Return(nil).Once()
Expand Down Expand Up @@ -614,6 +620,8 @@ func (s *shardControllerSuite) setupMocksForAcquireShard(shardID int, mockEngine
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferFailoverLevels: map[string]persistence.TransferFailoverLevel{},
TimerFailoverLevels: map[string]persistence.TimerFailoverLevel{},
},
PreviousRangeID: currentRangeID,
}).Return(nil).Once()
Expand Down
Loading