Skip to content

Commit

Permalink
Fix task loading for multi-cursor transfer queue processor (#3298)
Browse files Browse the repository at this point in the history
* Add a retry policy for reading transfer tasks
* Send a new task notification when the active processing queue changed
* Add new read level to AddTasks() method.
  • Loading branch information
yycptt authored Jun 2, 2020
1 parent 526a96d commit d787ccc
Show file tree
Hide file tree
Showing 9 changed files with 293 additions and 103 deletions.
4 changes: 2 additions & 2 deletions service/history/queue/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type (
State() ProcessingQueueState
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge(ProcessingQueue) []ProcessingQueue
AddTasks(map[task.Key]task.Task, bool)
AddTasks(map[task.Key]task.Task, task.Key)
UpdateAckLevel()
// TODO: add Offload() method
}
Expand All @@ -72,7 +72,7 @@ type (
Level() int
Queues() []ProcessingQueue
ActiveQueue() ProcessingQueue
AddTasks(map[task.Key]task.Task, bool)
AddTasks(map[task.Key]task.Task, task.Key)
UpdateAckLevels()
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge([]ProcessingQueue)
Expand Down
4 changes: 2 additions & 2 deletions service/history/queue/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions service/history/queue/processing_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (q *processingQueueImpl) Merge(

func (q *processingQueueImpl) AddTasks(
tasks map[task.Key]task.Task,
more bool,
newReadLevel task.Key,
) {
for key, task := range tasks {
if _, loaded := q.outstandingTasks[key]; loaded {
Expand All @@ -247,14 +247,9 @@ func (q *processingQueueImpl) AddTasks(
}

q.outstandingTasks[key] = task
if q.state.readLevel.Less(key) {
q.state.readLevel = key
}
}

if !more {
q.state.readLevel = q.state.maxLevel
}
q.state.readLevel = newReadLevel
}

func (q *processingQueueImpl) UpdateAckLevel() {
Expand All @@ -276,9 +271,12 @@ func (q *processingQueueImpl) UpdateAckLevel() {
delete(q.outstandingTasks, key)
}

if len(q.outstandingTasks) == 0 && q.state.readLevel == q.state.maxLevel {
q.state.ackLevel = q.state.maxLevel
if len(q.outstandingTasks) == 0 {
q.state.ackLevel = q.state.readLevel
}

// TODO: add a check for specifically for timer task key
// and override the taskID field for timer task key to 0.
}

func splitProcessingQueue(
Expand Down
7 changes: 4 additions & 3 deletions service/history/queue/processing_queue_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ func (c *processingQueueCollection) ActiveQueue() ProcessingQueue {

func (c *processingQueueCollection) AddTasks(
tasks map[task.Key]task.Task,
more bool,
newReadLevel task.Key,
) {
c.ActiveQueue().AddTasks(tasks, more)
activeQueue := c.ActiveQueue()
activeQueue.AddTasks(tasks, newReadLevel)

if !more {
if taskKeyEquals(activeQueue.State().ReadLevel(), activeQueue.State().MaxLevel()) {
c.resetActiveQueue()
}
}
Expand Down
20 changes: 12 additions & 8 deletions service/history/queue/processing_queue_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,42 +106,46 @@ func (s *processingQueueCollectionSuite) TestNewCollection_OutOfOrderQueues() {
s.True(s.isQueuesSorted(queueCollection.queues))
}

func (s *processingQueueCollectionSuite) TestAddTasks_WithMoreTasks() {
func (s *processingQueueCollectionSuite) TestAddTasks_ReadNotFinished() {
totalQueues := 4
currentActiveIdx := 1
newReadLevel := &testKey{ID: 9}

mockQueues := []*MockProcessingQueue{}
for i := 0; i != totalQueues; i++ {
mockQueues = append(mockQueues, NewMockProcessingQueue(s.controller))
}
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), true).Times(1)
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), newReadLevel).Times(1)
mockQueues[currentActiveIdx].EXPECT().State().Return(newProcessingQueueState(
s.level,
&testKey{ID: 3},
&testKey{ID: 10},
newReadLevel,
&testKey{ID: 10},
DomainFilter{},
)).AnyTimes()

queueCollection := s.newTestProcessingQueueCollection(s.level, mockQueues)
queueCollection.activeQueue = mockQueues[currentActiveIdx]

queueCollection.AddTasks(map[task.Key]task.Task{}, true)
queueCollection.AddTasks(map[task.Key]task.Task{}, newReadLevel)
s.Equal(mockQueues[currentActiveIdx].State(), queueCollection.ActiveQueue().State())
}

func (s *processingQueueCollectionSuite) TestAddTask_NoMoreTasks() {
func (s *processingQueueCollectionSuite) TestAddTask_ReadFinished() {
totalQueues := 4
currentActiveIdx := 1
newReadLevel := &testKey{ID: 10}

mockQueues := []*MockProcessingQueue{}
for i := 0; i != totalQueues; i++ {
mockQueues = append(mockQueues, NewMockProcessingQueue(s.controller))
}
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), false).Times(1)
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), newReadLevel).Times(1)
for i := 0; i != totalQueues; i++ {
mockQueues[i].EXPECT().State().Return(newProcessingQueueState(
s.level,
&testKey{ID: 3},
&testKey{ID: 10},
newReadLevel,
&testKey{ID: 10},
DomainFilter{},
)).AnyTimes()
Expand All @@ -150,7 +154,7 @@ func (s *processingQueueCollectionSuite) TestAddTask_NoMoreTasks() {
queueCollection := s.newTestProcessingQueueCollection(s.level, mockQueues)
queueCollection.activeQueue = mockQueues[currentActiveIdx]

queueCollection.AddTasks(map[task.Key]task.Task{}, false)
queueCollection.AddTasks(map[task.Key]task.Task{}, newReadLevel)
s.Nil(queueCollection.ActiveQueue())
}

Expand Down
54 changes: 11 additions & 43 deletions service/history/queue/processing_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *processingQueueSuite) TearDownTest() {
s.controller.Finish()
}

func (s *processingQueueSuite) TestAddTasks_WithMoreTasks() {
func (s *processingQueueSuite) TestAddTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

Expand Down Expand Up @@ -99,51 +99,18 @@ func (s *processingQueueSuite) TestAddTasks_WithMoreTasks() {
make(map[task.Key]task.Task),
)

queue.AddTasks(tasks, true)
newReadLevel := &testKey{ID: 10}
queue.AddTasks(tasks, newReadLevel)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(taskKeys[len(taskKeys)-1], queue.state.readLevel)
s.Equal(newReadLevel, queue.state.readLevel)

// add the same set of tasks again, should have no effect
queue.AddTasks(tasks, true)
queue.AddTasks(tasks, newReadLevel)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(taskKeys[len(taskKeys)-1], queue.state.readLevel)
s.Equal(newReadLevel, queue.state.readLevel)
}

func (s *processingQueueSuite) TestAddTasks_NoMoreTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

taskKeys := []task.Key{
&testKey{ID: 2},
&testKey{ID: 3},
&testKey{ID: 5},
&testKey{ID: 9},
}
tasks := make(map[task.Key]task.Task)
for _, key := range taskKeys {
mockTask := task.NewMockTask(s.controller)
mockTask.EXPECT().GetDomainID().Return("some random domainID").AnyTimes()
mockTask.EXPECT().GetWorkflowID().Return("some random workflowID").AnyTimes()
mockTask.EXPECT().GetRunID().Return("some random runID").AnyTimes()
mockTask.EXPECT().GetTaskType().Return(0).AnyTimes()
tasks[key] = mockTask
}

queue := s.newTestProcessingQueue(
0,
ackLevel,
ackLevel,
maxLevel,
NewDomainFilter(nil, true),
make(map[task.Key]task.Task),
)

queue.AddTasks(tasks, false)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(maxLevel, queue.state.readLevel)
}

func (s *processingQueueSuite) TestUpdateAckLevel_WithMoreTasks() {
func (s *processingQueueSuite) TestUpdateAckLevel_WithPendingTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

Expand Down Expand Up @@ -181,8 +148,9 @@ func (s *processingQueueSuite) TestUpdateAckLevel_WithMoreTasks() {
s.Equal(&testKey{ID: 3}, queue.state.ackLevel)
}

func (s *processingQueueSuite) TestUpdateAckLevel_NoMoreTasks() {
func (s *processingQueueSuite) TestUpdateAckLevel_NoPendingTasks() {
ackLevel := &testKey{ID: 1}
readLevel := &testKey{ID: 9}
maxLevel := &testKey{ID: 10}

taskKeys := []task.Key{
Expand All @@ -207,14 +175,14 @@ func (s *processingQueueSuite) TestUpdateAckLevel_NoMoreTasks() {
queue := s.newTestProcessingQueue(
0,
ackLevel,
maxLevel,
readLevel,
maxLevel,
NewDomainFilter(nil, true),
tasks,
)

queue.UpdateAckLevel()
s.Equal(maxLevel, queue.state.ackLevel)
s.Equal(readLevel, queue.state.ackLevel)
}

func (s *processingQueueSuite) TestSplit() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/split_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"sort"
"testing"

gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

Expand Down
53 changes: 35 additions & 18 deletions service/history/queue/transfer_queue_processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (

var (
loadQueueTaskThrottleRetryDelay = 5 * time.Second

persistenceOperationRetryPolicy = common.CreatePersistanceRetryPolicy()
)

type (
Expand Down Expand Up @@ -298,13 +300,20 @@ func (t *transferQueueProcessorBase) processBatch() {
t.queueCollectionsLock.RLock()
activeQueue := queueCollection.ActiveQueue()
if activeQueue == nil {
t.queueCollectionsLock.RUnlock()
continue
}

readLevel := activeQueue.State().ReadLevel()
maxLevel := activeQueue.State().MaxLevel()
maxReadLevel := activeQueue.State().MaxLevel()
t.queueCollectionsLock.RUnlock()

transferTaskInfos, more, partialRead, err := t.readTasks(readLevel, maxLevel)
shardMaxReadLevel := t.maxReadLevel()
if shardMaxReadLevel.Less(maxReadLevel) {
maxReadLevel = shardMaxReadLevel
}

transferTaskInfos, more, err := t.readTasks(readLevel, maxReadLevel)
if err != nil {
t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
t.notifyNewTask() // re-enqueue the event
Expand All @@ -331,11 +340,19 @@ func (t *transferQueueProcessorBase) processBatch() {
}
}

var newReadLevel task.Key
if !more {
newReadLevel = maxReadLevel
} else {
newReadLevel = newTransferTaskKey(transferTaskInfos[len(transferTaskInfos)-1].GetTaskID())
}
t.queueCollectionsLock.Lock()
queueCollection.AddTasks(tasks, more || partialRead)
queueCollection.AddTasks(tasks, newReadLevel)
newActiveQueue := queueCollection.ActiveQueue()
t.queueCollectionsLock.Unlock()

if more {
if more || (newActiveQueue != nil && newActiveQueue != activeQueue) {
// more tasks for the current active queue or the active queue has changed
t.notifyNewTask()
}
}
Expand Down Expand Up @@ -434,25 +451,25 @@ func (t *transferQueueProcessorBase) getProcessingQueueStates() []ProcessingQueu
func (t *transferQueueProcessorBase) readTasks(
readLevel task.Key,
maxReadLevel task.Key,
) ([]*persistence.TransferTaskInfo, bool, bool, error) {
shardMaxReadLevel := t.maxReadLevel()
partialRead := false
if shardMaxReadLevel.Less(maxReadLevel) {
partialRead = true
maxReadLevel = shardMaxReadLevel
) ([]*persistence.TransferTaskInfo, bool, error) {

var response *persistence.GetTransferTasksResponse
op := func() error {
var err error
response, err = t.shard.GetExecutionManager().GetTransferTasks(&persistence.GetTransferTasksRequest{
ReadLevel: readLevel.(*transferTaskKey).taskID,
MaxReadLevel: maxReadLevel.(*transferTaskKey).taskID,
BatchSize: t.options.BatchSize(),
})
return err
}

response, err := t.shard.GetExecutionManager().GetTransferTasks(&persistence.GetTransferTasksRequest{
ReadLevel: readLevel.(*transferTaskKey).taskID,
MaxReadLevel: maxReadLevel.(*transferTaskKey).taskID,
BatchSize: t.options.BatchSize(),
})

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return nil, false, false, err
return nil, false, err
}

return response.Tasks, len(response.NextPageToken) != 0, partialRead, nil
return response.Tasks, len(response.NextPageToken) != 0, nil
}

func (t *transferQueueProcessorBase) submitTask(
Expand Down
Loading

0 comments on commit d787ccc

Please sign in to comment.