Skip to content

Commit

Permalink
Enabling service-discovery driven shutdown of tasklists in matching e…
Browse files Browse the repository at this point in the history
…ngine (#6198)

What changed?

Matching hosts presently are at quite a lot of risk of delaying processing or getting contended during shard ownership changes, particularly if the service-discovery changes occur before the container shutdown is kicked off.

This change attempts to be somewhat more proactive in cleaning up and guarding against shard changes so that the Matching host reliquishes ownership and processing of the tasklist manager it may have lost earlier, rather than fighting with the new owner by incrementing the shard counter and delaying processing.

This can happen in a few ways: A host shutdown is taking a while, but the new host is reacting to service-discovery and taking ownership of the original shards, but hitting a taskreader still attempting to take lock, for one example. Another is a scaling-up event where the shards are stolen from an existing host.

Why?

How did you test it?

This has been tested by deploying in a couple of pre-prod envs in a few iterations, but at this point in time needs a bit more manual testing.

Testing currently:

 Preliminary testing in development environments
 Unit testing
 Manual desting in staging
Feature enablement

This feature requires the bool flag matching.enableTasklistGuardAgainstOwnershipLoss
  • Loading branch information
davidporter-id-au authored Aug 15, 2024
1 parent 952af5a commit b1c923e
Show file tree
Hide file tree
Showing 13 changed files with 691 additions and 81 deletions.
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,13 @@ const (
// Default value: false
// Allowed filters: DomainID
MatchingEnableTaskInfoLogByDomainID
// MatchingEnableTasklistGuardAgainstOwnershipShardLoss
// enables guards to prevent tasklists from processing if there is any detection that the host
// no longer is active or owns the shard
// KeyName: matching.enableTasklistGuardAgainstOwnershipLoss
// Value type: Bool
// Default value: false
MatchingEnableTasklistGuardAgainstOwnershipShardLoss

// key for history

Expand Down Expand Up @@ -4116,6 +4123,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID",
DefaultValue: false,
},
MatchingEnableTasklistGuardAgainstOwnershipShardLoss: {
KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss",
Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership",
DefaultValue: false,
},
EventsCacheGlobalEnable: {
KeyName: "history.eventsCacheGlobalEnable",
Description: "EventsCacheGlobalEnable is enables global cache over all history shards",
Expand Down
10 changes: 5 additions & 5 deletions common/errors/taskListNotOwnedByHostError.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ package errors

import "fmt"

var _ error = &TaskListNotOwnnedByHostError{}
var _ error = &TaskListNotOwnedByHostError{}

type TaskListNotOwnnedByHostError struct {
type TaskListNotOwnedByHostError struct {
OwnedByIdentity string
MyIdentity string
TasklistName string
}

func (m *TaskListNotOwnnedByHostError) Error() string {
func (m *TaskListNotOwnedByHostError) Error() string {
return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s",
m.OwnedByIdentity, m.MyIdentity, m.TasklistName)
}

func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError {
return &TaskListNotOwnnedByHostError{
func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError {
return &TaskListNotOwnedByHostError{
OwnedByIdentity: ownedByIdentity,
MyIdentity: myIdentity,
TasklistName: tasklistName,
Expand Down
6 changes: 6 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag {
return newStringTag("visibility-query", query)
}

// MembershipChangeEvent is a predefined tag for when logging hashring change events,
// expected to be of type membership.ChangeEvent
func MembershipChangeEvent(event interface{}) Tag {
return newPredefinedDynamicTag("membership-change-event", event)
}

// Dynamic Uses reflection based logging for arbitrary values
// for not very performant logging
func Dynamic(key string, v interface{}) Tag {
Expand Down
9 changes: 9 additions & 0 deletions common/membership/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package membership

import (
"fmt"
"sync"
"sync/atomic"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -84,6 +85,7 @@ type MultiringResolver struct {
status int32

provider PeerProvider
mu sync.Mutex
rings map[string]*ring
}

Expand All @@ -110,6 +112,7 @@ func NewMultiringResolver(
provider: provider,
rings: make(map[string]*ring),
metrics: metricsClient,
mu: sync.Mutex{},
}

for _, s := range services {
Expand All @@ -130,6 +133,8 @@ func (rpo *MultiringResolver) Start() {

rpo.provider.Start()

rpo.mu.Lock()
defer rpo.mu.Unlock()
for _, ring := range rpo.rings {
ring.Start()
}
Expand All @@ -145,6 +150,8 @@ func (rpo *MultiringResolver) Stop() {
return
}

rpo.mu.Lock()
defer rpo.mu.Unlock()
for _, ring := range rpo.rings {
ring.Stop()
}
Expand All @@ -163,6 +170,8 @@ func (rpo *MultiringResolver) EvictSelf() error {
}

func (rpo *MultiringResolver) getRing(service string) (*ring, error) {
rpo.mu.Lock()
defer rpo.mu.Unlock()
ring, found := rpo.rings[service]
if !found {
return nil, fmt.Errorf("service %q is not tracked by Resolver", service)
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type (
TaskDispatchRPSTTL time.Duration
// task gc configuration
MaxTimeBetweenTaskDeletes time.Duration

EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn
}

ForwarderConfig struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss),
LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime),
LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime),
HostName: hostName,
Expand Down
1 change: 1 addition & 0 deletions service/matching/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestNewConfig(t *testing.T) {
"TaskDispatchRPS": {nil, 100000.0},
"TaskDispatchRPSTTL": {nil, time.Minute},
"MaxTimeBetweenTaskDeletes": {nil, time.Second},
"EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false},
}
client := dynamicconfig.NewInMemoryClient()
for fieldName, expected := range fields {
Expand Down
94 changes: 73 additions & 21 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type (
}

matchingEngineImpl struct {
shutdownCompletion *sync.WaitGroup
shutdown chan struct{}
taskManager persistence.TaskManager
clusterMetadata cluster.Metadata
historyService history.Client
Expand Down Expand Up @@ -120,7 +122,8 @@ var (
var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented

// NewEngine creates an instance of matching engine
func NewEngine(taskManager persistence.TaskManager,
func NewEngine(
taskManager persistence.TaskManager,
clusterMetadata cluster.Metadata,
historyService history.Client,
matchingClient matching.Client,
Expand All @@ -132,7 +135,10 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner partition.Partitioner,
timeSource clock.TimeSource,
) Engine {

e := &matchingEngineImpl{
shutdown: make(chan struct{}),
shutdownCompletion: &sync.WaitGroup{},
taskManager: taskManager,
clusterMetadata: clusterMetadata,
historyService: historyService,
Expand All @@ -149,19 +155,24 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner: partitioner,
timeSource: timeSource,
}

e.shutdownCompletion.Add(1)
go e.subscribeToMembershipChanges()

e.waitForQueryResultFn = e.waitForQueryResult
return e
}

func (e *matchingEngineImpl) Start() {
// As task lists are initialized lazily nothing is done on startup at this point.
}

func (e *matchingEngineImpl) Stop() {
close(e.shutdown)
// Executes Stop() on each task list outside of lock
for _, l := range e.getTaskLists(math.MaxInt32) {
l.Stop()
}
e.shutdownCompletion.Wait()
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
Expand Down Expand Up @@ -200,26 +211,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
}
e.taskListsLock.RUnlock()

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
err := e.errIfShardLoss(taskList)
if err != nil {
return nil, fmt.Errorf("failed to lookup task list owner: %w", err)
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return nil, fmt.Errorf("failed to lookup self im membership: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
return nil, cadence_errors.NewTaskListNotOwnnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
return nil, err
}

// If it gets here, write lock and check again in case a task list is created between the two locks
Expand Down Expand Up @@ -1202,6 +1196,64 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog(
}
}

func (e *matchingEngineImpl) errIfShardLoss(taskList *tasklist.Identifier) error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return fmt.Errorf("failed to lookup self im membership: %w", err)
}

if e.isShuttingDown() {
e.logger.Warn("request to get tasklist is being rejected because engine is shutting down",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)

return cadence_errors.NewTaskListNotOwnedByHostError(
"not known",
self.Identity(),
taskList.GetName(),
)
}

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return fmt.Errorf("failed to lookup task list owner: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return cadence_errors.NewTaskListNotOwnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
}

return nil
}

func (e *matchingEngineImpl) isShuttingDown() bool {
select {
case <-e.shutdown:
return true
default:
return false
}
}

func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
m.Lock()
defer m.Unlock()
Expand Down
57 changes: 3 additions & 54 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
Expand All @@ -45,7 +44,6 @@ import (
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -55,6 +53,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/matching/config"
"github.com/uber/cadence/service/matching/tasklist"
Expand Down Expand Up @@ -131,6 +130,7 @@ func (s *matchingEngineSuite) SetupTest() {
s.mockMembershipResolver = membership.NewMockResolver(s.controller)
s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes()
s.mockMembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes()
s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller)
dcClient := dynamicconfig.NewInMemoryClient()
dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true)
Expand Down Expand Up @@ -1303,58 +1303,6 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() {
s.EqualValues(configEmpty.HostName, "")
}

func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() {
testCases := []struct {
name string
lookUpResult string
lookUpErr error
whoAmIResult string
whoAmIErr error

expectedError error
}{
{
name: "Not owned by current host",
lookUpResult: "A",
whoAmIResult: "B",
expectedError: new(cadence_errors.TaskListNotOwnnedByHostError),
},
{
name: "LookupError",
lookUpErr: assert.AnError,
expectedError: assert.AnError,
},
{
name: "WhoAmIError",
whoAmIErr: assert.AnError,
expectedError: assert.AnError,
},
}

for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
resolverMock := membership.NewMockResolver(s.controller)
s.matchingEngine.membershipResolver = resolverMock

resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(
membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr,
).AnyTimes()
resolverMock.EXPECT().WhoAmI().Return(
membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr,
).AnyTimes()

taskListKind := types.TaskListKindNormal

_, err := s.matchingEngine.getTaskListManager(
tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity),
&taskListKind,
)

assert.ErrorAs(s.T(), err, &tc.expectedError)
})
}
}

func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64,
scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent {
historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled)
Expand Down Expand Up @@ -1402,6 +1350,7 @@ func defaultTestConfig() *config.Config {
config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10)
config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)
config.MaxTimeBetweenTaskDeletes = time.Duration(0)
config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { return true }
return config
}

Expand Down
Loading

0 comments on commit b1c923e

Please sign in to comment.