Skip to content

Commit

Permalink
Multicursor Queue Processor Bug Fix (#3508)
Browse files Browse the repository at this point in the history
* Fix bug on merging processing queue and processing queue collection
* Prevent ack level to move beyond read level
  • Loading branch information
yycptt authored Sep 14, 2020
1 parent 4d4482e commit 57a20dc
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 23 deletions.
1 change: 0 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2818,7 +2818,6 @@ func (e *historyEngineImpl) DescribeTransferQueue(
ctx context.Context,
clusterName string,
) (*workflow.DescribeQueueResponse, error) {
fmt.Println("described queue: shardID: ", e.shard.GetShardID())
return e.describeQueue(e.txProcessor, clusterName)
}

Expand Down
36 changes: 36 additions & 0 deletions service/history/queue/processing_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func newProcessingQueue(
queue.state = copyQueueState(state)
}

if queue.state.readLevel.Less(queue.state.ackLevel) {
logger.Fatal("ack level larger than readlevel when creating processing queue", tag.Error(
fmt.Errorf("ack level: %v, read level: %v", queue.state.ackLevel, queue.state.readLevel),
))
}

return queue
}

Expand Down Expand Up @@ -224,15 +230,31 @@ func (q *processingQueueImpl) Merge(
q1.state.domainFilter.Merge(q2.state.domainFilter),
))

for _, state := range newQueueStates {
if state.ReadLevel().Less(state.AckLevel()) || state.MaxLevel().Less(state.ReadLevel()) {
q.logger.Fatal("invalid processing queue merge result", tag.Error(
fmt.Errorf("q1: %v, q2: %v, merge result: %v", q1.state, q2.state, newQueueStates),
))
}
}

return splitProcessingQueue([]*processingQueueImpl{q1, q2}, newQueueStates, q.logger, q.metricsClient)
}

func (q *processingQueueImpl) AddTasks(
tasks map[task.Key]task.Task,
newReadLevel task.Key,
) {
if newReadLevel.Less(q.state.readLevel) {
q.logger.Fatal("processing queue read level moved backward", tag.Error(
fmt.Errorf("current read level: %v, new read level: %v", q.state.readLevel, newReadLevel),
))
}

for key, task := range tasks {
if _, loaded := q.outstandingTasks[key]; loaded {
// TODO: this means the task has been submitted before, we should mark the task state accordingly and
// do not submit this task again in transfer/timer queue processor base
q.logger.Debug(fmt.Sprintf("Skipping task: %+v. DomainID: %v, WorkflowID: %v, RunID: %v, Type: %v",
key, task.GetDomainID(), task.GetWorkflowID(), task.GetRunID(), task.GetTaskType()))
continue
Expand Down Expand Up @@ -263,6 +285,14 @@ func (q *processingQueueImpl) UpdateAckLevel() (task.Key, int) {
})

for _, key := range keys {
if q.state.readLevel.Less(key) {
// this can happen as during merge read level can move backward
// besides that for time task key, readLevel is expected to be less than task key
// as the taskID for read level is always 0. This means we can potentially buffer
// more timer tasks in memory. If this becomes a problem, we can change this logic.
break
}

if q.outstandingTasks[key].State() != t.TaskStateAcked {
break
}
Expand All @@ -279,6 +309,12 @@ func (q *processingQueueImpl) UpdateAckLevel() (task.Key, int) {
q.state.ackLevel = newTimerTaskKey(timerKey.visibilityTimestamp, 0)
}

if q.state.readLevel.Less(q.state.ackLevel) {
q.logger.Fatal("ack level moved beyond read level", tag.Error(
fmt.Errorf("processing queue state: %v", q.state),
))
}

return q.state.ackLevel, len(q.outstandingTasks)
}

Expand Down
28 changes: 23 additions & 5 deletions service/history/queue/processing_queue_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package queue

import (
"fmt"
"sort"

"github.com/uber/cadence/service/history/task"
Expand Down Expand Up @@ -145,16 +146,22 @@ func (c *processingQueueCollection) Merge(
newQueues = append(newQueues, mergedQueues[:len(mergedQueues)-1]...)

lastMergedQueue := mergedQueues[len(mergedQueues)-1]
if currentQueueIdx+1 == len(c.queues) ||
!c.queues[currentQueueIdx+1].State().AckLevel().Less(lastMergedQueue.State().MaxLevel()) {
overlapWithCurrentQueue := currentQueueIdx+1 != len(c.queues) &&
c.queues[currentQueueIdx+1].State().AckLevel().Less(lastMergedQueue.State().MaxLevel())
overlapWithIncomingQueue := incomingQueueIdx+1 != len(incomingQueues) &&
incomingQueues[incomingQueueIdx+1].State().AckLevel().Less(lastMergedQueue.State().MaxLevel())

if !overlapWithCurrentQueue && !overlapWithIncomingQueue {
newQueues = append(newQueues, lastMergedQueue)
incomingQueueIdx++
} else {
currentQueueIdx++
} else if overlapWithCurrentQueue {
incomingQueues[incomingQueueIdx] = lastMergedQueue
currentQueueIdx++
} else {
c.queues[currentQueueIdx] = lastMergedQueue
incomingQueueIdx++
}

currentQueueIdx++
}

if incomingQueueIdx < len(incomingQueues) {
Expand All @@ -167,6 +174,17 @@ func (c *processingQueueCollection) Merge(

c.queues = newQueues

// make sure the result is ordered and disjoint
for idx := 0; idx < len(c.queues)-1; idx++ {
if c.queues[idx+1].State().AckLevel().Less(c.queues[idx].State().MaxLevel()) {
errMsg := ""
for _, q := range c.queues {
errMsg += fmt.Sprintf("%v ", q)
}
panic("invalid processing queue merge result: " + errMsg)
}
}

c.resetActiveQueue()
}

Expand Down
113 changes: 113 additions & 0 deletions service/history/queue/processing_queue_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,119 @@ func (s *processingQueueCollectionSuite) TestMerge() {
),
},
},
{
currentQueueStates: []ProcessingQueueState{
newProcessingQueueState(
s.level,
testKey{ID: 10},
testKey{ID: 15},
testKey{ID: 20},
DomainFilter{DomainIDs: map[string]struct{}{"domain1": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 30},
testKey{ID: 40},
testKey{ID: 50},
DomainFilter{DomainIDs: map[string]struct{}{"domain2": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 60},
testKey{ID: 65},
testKey{ID: 70},
DomainFilter{DomainIDs: map[string]struct{}{"domain2": {}}},
),
},
incomingQueueStates: []ProcessingQueueState{
newProcessingQueueState(
s.level,
testKey{ID: 0},
testKey{ID: 5},
testKey{ID: 15},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 18},
testKey{ID: 18},
testKey{ID: 100},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
},
expectedActiveQueueState: newProcessingQueueState(
s.level,
testKey{ID: 0},
testKey{ID: 5},
testKey{ID: 10},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
expectedNewQueueStates: []ProcessingQueueState{
newProcessingQueueState(
s.level,
testKey{ID: 0},
testKey{ID: 5},
testKey{ID: 10},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 10},
testKey{ID: 10},
testKey{ID: 15},
DomainFilter{DomainIDs: map[string]struct{}{"domain1": {}, "domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 15},
testKey{ID: 15},
testKey{ID: 18},
DomainFilter{DomainIDs: map[string]struct{}{"domain1": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 18},
testKey{ID: 18},
testKey{ID: 20},
DomainFilter{DomainIDs: map[string]struct{}{"domain1": {}, "domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 20},
testKey{ID: 20},
testKey{ID: 30},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 30},
testKey{ID: 30},
testKey{ID: 50},
DomainFilter{DomainIDs: map[string]struct{}{"domain2": {}, "domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 50},
testKey{ID: 50},
testKey{ID: 60},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 60},
testKey{ID: 60},
testKey{ID: 70},
DomainFilter{DomainIDs: map[string]struct{}{"domain2": {}, "domain3": {}}},
),
newProcessingQueueState(
s.level,
testKey{ID: 70},
testKey{ID: 70},
testKey{ID: 100},
DomainFilter{DomainIDs: map[string]struct{}{"domain3": {}}},
),
},
},
}

for _, tc := range testCases {
Expand Down
41 changes: 41 additions & 0 deletions service/history/queue/processing_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,47 @@ func (s *processingQueueSuite) TestUpdateAckLevel_NoPendingTasks() {
s.Equal(0, pendingTasks)
}

func (s *processingQueueSuite) TestUpdateAckLevel_TaskKeyLargerThanReadLevel() {
ackLevel := testKey{ID: 1}
readLevel := testKey{ID: 5}
maxLevel := testKey{ID: 10}

taskKeys := []task.Key{
testKey{ID: 2},
testKey{ID: 3},
testKey{ID: 5},
testKey{ID: 8},
testKey{ID: 9},
}
taskStates := []t.State{
t.TaskStateAcked,
t.TaskStateAcked,
t.TaskStateAcked,
t.TaskStateAcked,
t.TaskStatePending,
}
tasks := make(map[task.Key]task.Task)
for i, key := range taskKeys {
task := task.NewMockTask(s.controller)
task.EXPECT().State().Return(taskStates[i]).MaxTimes(1)
tasks[key] = task
}

queue := s.newTestProcessingQueue(
0,
ackLevel,
readLevel,
maxLevel,
NewDomainFilter(nil, true),
tasks,
)

newAckLevel, pendingTasks := queue.UpdateAckLevel()
s.Equal(readLevel, newAckLevel)
s.Equal(readLevel, queue.state.ackLevel)
s.Equal(2, pendingTasks)
}

func (s *processingQueueSuite) TestSplit() {
testCases := []struct {
queue *processingQueueImpl
Expand Down
40 changes: 23 additions & 17 deletions service/history/queue/processor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package queue

import (
"fmt"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -275,30 +274,40 @@ func (p *processorBase) splitProcessingQueueCollection(
return
}

newQueuesMap := make(map[int][]ProcessingQueue)
newQueuesMap := make(map[int][][]ProcessingQueue)
for _, queueCollection := range p.processingQueueCollections {
currentNewQueuesMap := make(map[int][]ProcessingQueue)
newQueues := queueCollection.Split(splitPolicy)
for _, newQueue := range newQueues {
newQueueLevel := newQueue.State().Level()
newQueuesMap[newQueueLevel] = append(newQueuesMap[newQueueLevel], newQueue)
currentNewQueuesMap[newQueueLevel] = append(currentNewQueuesMap[newQueueLevel], newQueue)
}

if queuesToMerge, ok := newQueuesMap[queueCollection.Level()]; ok {
queueCollection.Merge(queuesToMerge)
delete(newQueuesMap, queueCollection.Level())
for newQueueLevel, queues := range currentNewQueuesMap {
newQueuesMap[newQueueLevel] = append(newQueuesMap[newQueueLevel], queues)
}
}

for level, newQueues := range newQueuesMap {
p.processingQueueCollections = append(p.processingQueueCollections, NewProcessingQueueCollection(
level,
newQueues,
))
for _, queueCollection := range p.processingQueueCollections {
if queuesList, ok := newQueuesMap[queueCollection.Level()]; ok {
for _, queues := range queuesList {
queueCollection.Merge(queues)
}
}
delete(newQueuesMap, queueCollection.Level())
}

sort.Slice(p.processingQueueCollections, func(i, j int) bool {
return p.processingQueueCollections[i].Level() < p.processingQueueCollections[j].Level()
})
for level, newQueuesList := range newQueuesMap {
newQueueCollection := NewProcessingQueueCollection(
level,
[]ProcessingQueue{},
)
for _, newQueues := range newQueuesList {
newQueueCollection.Merge(newQueues)
}
p.processingQueueCollections = append(p.processingQueueCollections, newQueueCollection)
delete(newQueuesMap, level)
}

// there can be new queue collections created or new queues added to an existing collection
for _, queueCollections := range p.processingQueueCollections {
Expand Down Expand Up @@ -471,9 +480,6 @@ func newProcessingQueueCollections(
queues,
))
}
sort.Slice(processingQueueCollections, func(i, j int) bool {
return processingQueueCollections[i].Level() < processingQueueCollections[j].Level()
})

return processingQueueCollections
}
Expand Down
7 changes: 7 additions & 0 deletions service/history/queue/split_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package queue

import (
"fmt"
"math/rand"

"github.com/uber/cadence/common/log"
Expand Down Expand Up @@ -424,6 +425,12 @@ func splitQueueHelper(
))
}

for _, state := range newQueueStates {
if state.ReadLevel().Less(state.AckLevel()) || state.MaxLevel().Less(state.ReadLevel()) {
panic(fmt.Sprintf("invalid processing queue split result: %v, state before split: %v, newMaxLevel: %v", state, queueImpl.state, newMaxLevel))
}
}

return newQueueStates
}

Expand Down

0 comments on commit 57a20dc

Please sign in to comment.