Skip to content

Commit

Permalink
make HandleAction more robust
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jul 30, 2020
1 parent 4982dae commit 1ced8db
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
12 changes: 7 additions & 5 deletions service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package queue

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -309,12 +308,15 @@ func (t *timerQueueProcessor) HandleAction(clusterName string, action *Action) (
}

if !added {
return nil, errors.New("queue processor has been shutdown")
return nil, errProcessorShutdown
}

resultNotification := <-resultNotificationCh

return resultNotification.result, resultNotification.err
select {
case resultNotification := <-resultNotificationCh:
return resultNotification.result, resultNotification.err
case <-t.shutdownChan:
return nil, errProcessorShutdown
}
}

func (t *timerQueueProcessor) LockTaskProcessing() {
Expand Down
12 changes: 8 additions & 4 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (

var (
errUnexpectedQueueTask = errors.New("unexpected queue task")
errProcessorShutdown = errors.New("queue processor has been shutdown")

maxTransferReadLevel = newTransferTaskKey(math.MaxInt64)
)
Expand Down Expand Up @@ -320,12 +321,15 @@ func (t *transferQueueProcessor) HandleAction(clusterName string, action *Action
}

if !added {
return nil, errors.New("queue processor has been shutdown")
return nil, errProcessorShutdown
}

resultNotification := <-resultNotificationCh

return resultNotification.result, resultNotification.err
select {
case resultNotification := <-resultNotificationCh:
return resultNotification.result, resultNotification.err
case <-t.shutdownChan:
return nil, errProcessorShutdown
}
}

func (t *transferQueueProcessor) LockTaskProcessing() {
Expand Down

0 comments on commit 1ced8db

Please sign in to comment.