Skip to content

Commit

Permalink
Multiple bugfixes (#803)
Browse files Browse the repository at this point in the history
* bugfix: properly handle processor stop 
* bugfix: do not refresh shard on create workflow workflow already started error
  • Loading branch information
wxing1292 authored May 31, 2018
1 parent dd00c27 commit a7ebd05
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 50 deletions.
11 changes: 11 additions & 0 deletions common/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@

package common

const (
// used for background threads

// DaemonStatusInitialized coroutine pool initialized
DaemonStatusInitialized int32 = 0
// DaemonStatusStarted coroutine pool started
DaemonStatusStarted int32 = 1
// DaemonStatusStopped coroutine pool stopped
DaemonStatusStopped int32 = 2
)

type (
// Daemon is the base interfaces implemented by
// background tasks within cherami
Expand Down
4 changes: 0 additions & 4 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
requestID := "requestID"

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: requestID,
Expand Down Expand Up @@ -1005,7 +1004,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
identity := "testIdentity"

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once()
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once()
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{
Msg: "random message",
StartRequestID: "oldRequestID",
Expand Down Expand Up @@ -1064,7 +1062,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
expecedErrs := []bool{true, false, true}

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs))
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs))
s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }),
Expand Down Expand Up @@ -1152,7 +1149,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() {
for i, closeState := range closeStates {

s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs))
s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs))
s.mockExecutionMgr.On(
"CreateWorkflowExecution",
mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }),
Expand Down
12 changes: 4 additions & 8 deletions service/history/historyEventNotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@ import (

"github.com/pborman/uuid"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/metrics"
)

const (
eventsChanSize = 1000

// used for workflow pubsub status
statusIdle int32 = 0
statusStarted int32 = 1
statusStopped int32 = 2
)

type (
Expand Down Expand Up @@ -93,7 +89,7 @@ func newHistoryEventNotifier(metrics metrics.Client, workflowIDToShardID func(st
}
return &historyEventNotifierImpl{
metrics: metrics,
status: statusIdle,
status: common.DaemonStatusInitialized,
closeChan: make(chan bool),
eventsChan: make(chan *historyEventNotification, eventsChanSize),

Expand Down Expand Up @@ -213,14 +209,14 @@ func (notifier *historyEventNotifierImpl) dequeueHistoryEventNotifications() {
}

func (notifier *historyEventNotifierImpl) Start() {
if !atomic.CompareAndSwapInt32(&notifier.status, statusIdle, statusStarted) {
if !atomic.CompareAndSwapInt32(&notifier.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
go notifier.dequeueHistoryEventNotifications()
}

func (notifier *historyEventNotifierImpl) Stop() {
if !atomic.CompareAndSwapInt32(&notifier.status, statusStarted, statusStopped) {
if !atomic.CompareAndSwapInt32(&notifier.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
close(notifier.closeChan)
Expand Down
39 changes: 26 additions & 13 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)

type (
Expand Down Expand Up @@ -60,8 +61,7 @@ type (
workerNotificationChans []chan struct{}

notifyCh chan struct{}
isStarted int32
isStopped int32
status int32
shutdownWG sync.WaitGroup
shutdownCh chan struct{}
}
Expand All @@ -83,6 +83,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
processor: processor,
rateLimiter: common.NewTokenBucket(options.MaxPollRPS, common.NewRealTimeSource()),
workerNotificationChans: workerNotificationChans,
status: common.DaemonStatusInitialized,
notifyCh: make(chan struct{}, 1),
shutdownCh: make(chan struct{}),
metricsClient: shard.GetMetricsClient(),
Expand All @@ -94,7 +95,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
}

func (p *queueProcessorBase) Start() {
if !atomic.CompareAndSwapInt32(&p.isStarted, 0, 1) {
if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

Expand All @@ -107,16 +108,14 @@ func (p *queueProcessorBase) Start() {
}

func (p *queueProcessorBase) Stop() {
if !atomic.CompareAndSwapInt32(&p.isStopped, 0, 1) {
if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

logging.LogQueueProcesorShuttingDownEvent(p.logger)
defer logging.LogQueueProcesorShutdownEvent(p.logger)

if atomic.LoadInt32(&p.isStarted) == 1 {
close(p.shutdownCh)
}
close(p.shutdownCh)

if success := common.AwaitWaitGroup(&p.shutdownWG, time.Minute); !success {
logging.LogQueueProcesorShutdownTimedoutEvent(p.logger)
Expand Down Expand Up @@ -151,7 +150,8 @@ processorPumpLoop:
case <-p.shutdownCh:
break processorPumpLoop
case <-p.ackMgr.getFinishedChan():
p.Stop()
// use a separate gorouting since the caller hold the shutdownWG
go p.Stop()
case <-p.notifyCh:
p.processBatch(tasksCh)
case <-pollTimer.C:
Expand All @@ -167,7 +167,7 @@ processorPumpLoop:
// This is the only pump which writes to tasksCh, so it is safe to close channel here
close(tasksCh)
if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success {
p.logger.Warn("Queue processor timed out on worker shutdown.")
p.logger.Warn("Queue processor timedout on worker shutdown.")
}
updateAckTimer.Stop()
pollTimer.Stop()
Expand Down Expand Up @@ -210,11 +210,12 @@ func (p *queueProcessorBase) taskWorker(tasksCh <-chan queueTaskInfo, notificati

for {
select {
case <-p.shutdownCh:
return
case task, ok := <-tasksCh:
if !ok {
return
}

p.processWithRetry(notificationChan, task)
}
}
Expand All @@ -230,7 +231,13 @@ func (p *queueProcessorBase) retryTasks() {
}

func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task queueTaskInfo) {
p.logger.Debugf("Processing task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
switch task.(type) {
case *persistence.TransferTaskInfo:
p.logger.Debugf("Processing transfer task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
case *persistence.ReplicationTaskInfo:
p.logger.Debugf("Processing replication task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
}

ProcessRetryLoop:
for retryCount := 1; retryCount <= p.options.MaxRetryCount; {
select {
Expand Down Expand Up @@ -260,6 +267,12 @@ ProcessRetryLoop:
}

// All attempts to process transfer task failed. We won't be able to move the ackLevel so panic
logging.LogOperationPanicEvent(p.logger,
fmt.Sprintf("Retry count exceeded for taskID: %v", task.GetTaskID()), nil)
switch task.(type) {
case *persistence.TransferTaskInfo:
logging.LogOperationPanicEvent(p.logger,
fmt.Sprintf("Retry count exceeded for transfer taskID: %v", task.GetTaskID()), nil)
case *persistence.ReplicationTaskInfo:
logging.LogOperationPanicEvent(p.logger,
fmt.Sprintf("Retry count exceeded for replication taskID: %v", task.GetTaskID()), nil)
}
}
4 changes: 3 additions & 1 deletion service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ Create_Loop:
response, err := s.executionManager.CreateWorkflowExecution(request)
if err != nil {
switch err.(type) {
case *shared.WorkflowExecutionAlreadyStartedError, *shared.ServiceBusyError:
case *shared.WorkflowExecutionAlreadyStartedError,
*persistence.WorkflowExecutionAlreadyStartedError,
*shared.ServiceBusyError:
// No special handling required for these errors
case *persistence.ShardOwnershipLostError:
{
Expand Down
48 changes: 24 additions & 24 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package history

import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,8 +51,7 @@ type (
historyService *historyEngineImpl
cache *historyCache
executionManager persistence.ExecutionManager
isStarted int32
isStopped int32
status int32
shutdownWG sync.WaitGroup
shutdownCh chan struct{}
tasksCh chan *persistence.TimerTaskInfo
Expand Down Expand Up @@ -88,6 +88,7 @@ func newTimerQueueProcessorBase(shard ShardContext, historyService *historyEngin
historyService: historyService,
cache: historyService.historyCache,
executionManager: shard.GetExecutionManager(),
status: common.DaemonStatusInitialized,
shutdownCh: make(chan struct{}),
tasksCh: make(chan *persistence.TimerTaskInfo, 10*shard.GetConfig().TimerTaskBatchSize),
config: shard.GetConfig(),
Expand All @@ -103,7 +104,7 @@ func newTimerQueueProcessorBase(shard ShardContext, historyService *historyEngin
}

func (t *timerQueueProcessorBase) Start() {
if !atomic.CompareAndSwapInt32(&t.isStarted, 0, 1) {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

Expand All @@ -114,16 +115,14 @@ func (t *timerQueueProcessorBase) Start() {
}

func (t *timerQueueProcessorBase) Stop() {
if !atomic.CompareAndSwapInt32(&t.isStopped, 0, 1) {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

if atomic.LoadInt32(&t.isStarted) == 1 {
close(t.shutdownCh)
}
close(t.shutdownCh)

if success := common.AwaitWaitGroup(&t.shutdownWG, time.Minute); !success {
t.logger.Warn("Timer queue processor timed out on shutdown.")
t.logger.Warn("Timer queue processor timedout on shutdown.")
}

t.logger.Info("Timer queue processor stopped.")
Expand All @@ -132,24 +131,17 @@ func (t *timerQueueProcessorBase) Stop() {
func (t *timerQueueProcessorBase) processorPump() {
defer t.shutdownWG.Done()

// Workers to process timer tasks that are expired.

var workerWG sync.WaitGroup
for i := 0; i < t.config.TimerTaskWorkerCount; i++ {
workerWG.Add(1)
notificationChan := t.workerNotificationChans[i]
go t.processTaskWorker(&workerWG, notificationChan)
go t.taskWorker(&workerWG, notificationChan)
}

RetryProcessor:
for {
select {
case <-t.shutdownCh:
t.logger.Info("Timer queue processor pump shutting down.")
close(t.tasksCh)
if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success {
t.logger.Warn("Timer queue processor timed out on worker shutdown.")
}
break RetryProcessor
default:
err := t.internalProcessor()
Expand All @@ -158,14 +150,23 @@ RetryProcessor:
}
}
}

t.logger.Info("Timer queue processor pump shutting down.")
// This is the only pump which writes to tasksCh, so it is safe to close channel here
close(t.tasksCh)
if success := common.AwaitWaitGroup(&workerWG, 10*time.Second); !success {
t.logger.Warn("Timer queue processor timedout on worker shutdown.")
}
t.logger.Info("Timer processor exiting.")
}

func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, notificationChan chan struct{}) {
func (t *timerQueueProcessorBase) taskWorker(workerWG *sync.WaitGroup, notificationChan chan struct{}) {
defer workerWG.Done()

for {
select {
case <-t.shutdownCh:
return
case task, ok := <-t.tasksCh:
if !ok {
return
Expand All @@ -176,6 +177,7 @@ func (t *timerQueueProcessorBase) processTaskWorker(workerWG *sync.WaitGroup, no
}

func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struct{}, task *persistence.TimerTaskInfo) {
t.logger.Debugf("Processing timer task: %v, type: %v", task.GetTaskID(), task.GetTaskType())
ProcessRetryLoop:
for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount; {
select {
Expand Down Expand Up @@ -204,6 +206,9 @@ ProcessRetryLoop:
return
}
}
// All attempts to process transfer task failed. We won't be able to move the ackLevel so panic
logging.LogOperationPanicEvent(t.logger,
fmt.Sprintf("Retry count exceeded for timer taskID: %v", task.GetTaskID()), nil)
}

// NotifyNewTimers - Notify the processor about the new timer events arrival.
Expand Down Expand Up @@ -270,28 +275,23 @@ continueProcessor:
// 4. updating ack level
//
select {

case <-t.shutdownCh:
t.logger.Debug("Timer queue processor pump shutting down.")
return nil

case <-t.timerQueueAckMgr.getFinishedChan():
// timer queue ack manager indicate that all task scanned
// are finished and no more tasks
t.Stop()
// use a separate gorouting since the caller hold the shutdownWG
go t.Stop()
return nil

case <-timerGate.FireChan():
// Timer Fired.

case <-pollTimer.C:
// forced timer scan
pollTimer.Reset(t.config.TimerProcessorMaxPollInterval)

case <-updateAckChan:
t.timerQueueAckMgr.updateAckLevel()
continue continueProcessor

case <-t.newTimerCh:
t.newTimeLock.Lock()
newTime := t.newTime
Expand Down

0 comments on commit a7ebd05

Please sign in to comment.