From b1c923e211ef5c2c543703ca539340f0dcc022f3 Mon Sep 17 00:00:00 2001 From: David Porter Date: Wed, 14 Aug 2024 17:21:14 -0700 Subject: [PATCH] Enabling service-discovery driven shutdown of tasklists in matching engine (#6198) What changed? Matching hosts presently are at quite a lot of risk of delaying processing or getting contended during shard ownership changes, particularly if the service-discovery changes occur before the container shutdown is kicked off. This change attempts to be somewhat more proactive in cleaning up and guarding against shard changes so that the Matching host reliquishes ownership and processing of the tasklist manager it may have lost earlier, rather than fighting with the new owner by incrementing the shard counter and delaying processing. This can happen in a few ways: A host shutdown is taking a while, but the new host is reacting to service-discovery and taking ownership of the original shards, but hitting a taskreader still attempting to take lock, for one example. Another is a scaling-up event where the shards are stolen from an existing host. Why? How did you test it? This has been tested by deploying in a couple of pre-prod envs in a few iterations, but at this point in time needs a bit more manual testing. Testing currently: Preliminary testing in development environments Unit testing Manual desting in staging Feature enablement This feature requires the bool flag matching.enableTasklistGuardAgainstOwnershipLoss --- common/dynamicconfig/constants.go | 12 + common/errors/taskListNotOwnedByHostError.go | 10 +- common/log/tag/tags.go | 6 + common/membership/resolver.go | 9 + service/matching/config/config.go | 3 + service/matching/config/config_test.go | 1 + service/matching/handler/engine.go | 94 ++++-- .../handler/engine_integration_test.go | 57 +--- service/matching/handler/engine_test.go | 108 ++++++ service/matching/handler/interfaces.go | 3 +- service/matching/handler/interfaces_mock.go | 12 + service/matching/handler/membership.go | 150 +++++++++ service/matching/handler/membership_test.go | 307 ++++++++++++++++++ 13 files changed, 691 insertions(+), 81 deletions(-) create mode 100644 service/matching/handler/membership.go create mode 100644 service/matching/handler/membership_test.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 57b9eb63a09..c46aee7b232 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1672,6 +1672,13 @@ const ( // Default value: false // Allowed filters: DomainID MatchingEnableTaskInfoLogByDomainID + // MatchingEnableTasklistGuardAgainstOwnershipShardLoss + // enables guards to prevent tasklists from processing if there is any detection that the host + // no longer is active or owns the shard + // KeyName: matching.enableTasklistGuardAgainstOwnershipLoss + // Value type: Bool + // Default value: false + MatchingEnableTasklistGuardAgainstOwnershipShardLoss // key for history @@ -4116,6 +4123,11 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID", DefaultValue: false, }, + MatchingEnableTasklistGuardAgainstOwnershipShardLoss: { + KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss", + Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership", + DefaultValue: false, + }, EventsCacheGlobalEnable: { KeyName: "history.eventsCacheGlobalEnable", Description: "EventsCacheGlobalEnable is enables global cache over all history shards", diff --git a/common/errors/taskListNotOwnedByHostError.go b/common/errors/taskListNotOwnedByHostError.go index 15a8ba21cce..b1c4a566a82 100644 --- a/common/errors/taskListNotOwnedByHostError.go +++ b/common/errors/taskListNotOwnedByHostError.go @@ -24,21 +24,21 @@ package errors import "fmt" -var _ error = &TaskListNotOwnnedByHostError{} +var _ error = &TaskListNotOwnedByHostError{} -type TaskListNotOwnnedByHostError struct { +type TaskListNotOwnedByHostError struct { OwnedByIdentity string MyIdentity string TasklistName string } -func (m *TaskListNotOwnnedByHostError) Error() string { +func (m *TaskListNotOwnedByHostError) Error() string { return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s", m.OwnedByIdentity, m.MyIdentity, m.TasklistName) } -func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError { - return &TaskListNotOwnnedByHostError{ +func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError { + return &TaskListNotOwnedByHostError{ OwnedByIdentity: ownedByIdentity, MyIdentity: myIdentity, TasklistName: tasklistName, diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index a0d3b762017..00aa2b21233 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag { return newStringTag("visibility-query", query) } +// MembershipChangeEvent is a predefined tag for when logging hashring change events, +// expected to be of type membership.ChangeEvent +func MembershipChangeEvent(event interface{}) Tag { + return newPredefinedDynamicTag("membership-change-event", event) +} + // Dynamic Uses reflection based logging for arbitrary values // for not very performant logging func Dynamic(key string, v interface{}) Tag { diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 8afb0347d55..17d060fb2a4 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -25,6 +25,7 @@ package membership import ( "fmt" + "sync" "sync/atomic" "github.com/uber/cadence/common" @@ -84,6 +85,7 @@ type MultiringResolver struct { status int32 provider PeerProvider + mu sync.Mutex rings map[string]*ring } @@ -110,6 +112,7 @@ func NewMultiringResolver( provider: provider, rings: make(map[string]*ring), metrics: metricsClient, + mu: sync.Mutex{}, } for _, s := range services { @@ -130,6 +133,8 @@ func (rpo *MultiringResolver) Start() { rpo.provider.Start() + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Start() } @@ -145,6 +150,8 @@ func (rpo *MultiringResolver) Stop() { return } + rpo.mu.Lock() + defer rpo.mu.Unlock() for _, ring := range rpo.rings { ring.Stop() } @@ -163,6 +170,8 @@ func (rpo *MultiringResolver) EvictSelf() error { } func (rpo *MultiringResolver) getRing(service string) (*ring, error) { + rpo.mu.Lock() + defer rpo.mu.Unlock() ring, found := rpo.rings[service] if !found { return nil, fmt.Errorf("service %q is not tracked by Resolver", service) diff --git a/service/matching/config/config.go b/service/matching/config/config.go index 1e1e02184ab..a4c9b20abcd 100644 --- a/service/matching/config/config.go +++ b/service/matching/config/config.go @@ -81,6 +81,8 @@ type ( TaskDispatchRPSTTL time.Duration // task gc configuration MaxTimeBetweenTaskDeletes time.Duration + + EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn } ForwarderConfig struct { @@ -158,6 +160,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation), AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()), AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout), + EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss), LocalPollWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalPollWaitTime), LocalTaskWaitTime: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.LocalTaskWaitTime), HostName: hostName, diff --git a/service/matching/config/config_test.go b/service/matching/config/config_test.go index d6073776930..9924a534bec 100644 --- a/service/matching/config/config_test.go +++ b/service/matching/config/config_test.go @@ -79,6 +79,7 @@ func TestNewConfig(t *testing.T) { "TaskDispatchRPS": {nil, 100000.0}, "TaskDispatchRPSTTL": {nil, time.Minute}, "MaxTimeBetweenTaskDeletes": {nil, time.Second}, + "EnableTasklistOwnershipGuard": {dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss, false}, } client := dynamicconfig.NewInMemoryClient() for fieldName, expected := range fields { diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index a66cc5352fa..d685ae6189e 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -78,6 +78,8 @@ type ( } matchingEngineImpl struct { + shutdownCompletion *sync.WaitGroup + shutdown chan struct{} taskManager persistence.TaskManager clusterMetadata cluster.Metadata historyService history.Client @@ -120,7 +122,8 @@ var ( var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented // NewEngine creates an instance of matching engine -func NewEngine(taskManager persistence.TaskManager, +func NewEngine( + taskManager persistence.TaskManager, clusterMetadata cluster.Metadata, historyService history.Client, matchingClient matching.Client, @@ -132,7 +135,10 @@ func NewEngine(taskManager persistence.TaskManager, partitioner partition.Partitioner, timeSource clock.TimeSource, ) Engine { + e := &matchingEngineImpl{ + shutdown: make(chan struct{}), + shutdownCompletion: &sync.WaitGroup{}, taskManager: taskManager, clusterMetadata: clusterMetadata, historyService: historyService, @@ -149,19 +155,24 @@ func NewEngine(taskManager persistence.TaskManager, partitioner: partitioner, timeSource: timeSource, } + + e.shutdownCompletion.Add(1) + go e.subscribeToMembershipChanges() + e.waitForQueryResultFn = e.waitForQueryResult return e } func (e *matchingEngineImpl) Start() { - // As task lists are initialized lazily nothing is done on startup at this point. } func (e *matchingEngineImpl) Stop() { + close(e.shutdown) // Executes Stop() on each task list outside of lock for _, l := range e.getTaskLists(math.MaxInt32) { l.Stop() } + e.shutdownCompletion.Wait() } func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager { @@ -200,26 +211,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t } e.taskListsLock.RUnlock() - // Defensive check to make sure we actually own the task list - // If we try to create a task list manager for a task list that is not owned by us, return an error - // The new task list manager will steal the task list from the current owner, which should only happen if - // the task list is owned by the current host. - taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + err := e.errIfShardLoss(taskList) if err != nil { - return nil, fmt.Errorf("failed to lookup task list owner: %w", err) - } - - self, err := e.membershipResolver.WhoAmI() - if err != nil { - return nil, fmt.Errorf("failed to lookup self im membership: %w", err) - } - - if taskListOwner.Identity() != self.Identity() { - return nil, cadence_errors.NewTaskListNotOwnnedByHostError( - taskListOwner.Identity(), - self.Identity(), - taskList.GetName(), - ) + return nil, err } // If it gets here, write lock and check again in case a task list is created between the two locks @@ -1202,6 +1196,64 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog( } } +func (e *matchingEngineImpl) errIfShardLoss(taskList *tasklist.Identifier) error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return fmt.Errorf("failed to lookup self im membership: %w", err) + } + + if e.isShuttingDown() { + e.logger.Warn("request to get tasklist is being rejected because engine is shutting down", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + + return cadence_errors.NewTaskListNotOwnedByHostError( + "not known", + self.Identity(), + taskList.GetName(), + ) + } + + // Defensive check to make sure we actually own the task list + // If we try to create a task list manager for a task list that is not owned by us, return an error + // The new task list manager will steal the task list from the current owner, which should only happen if + // the task list is owned by the current host. + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName()) + if err != nil { + return fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + e.logger.Warn("Request to get tasklist is being rejected because engine does not own this shard", + tag.WorkflowDomainID(taskList.GetDomainID()), + tag.WorkflowTaskListType(taskList.GetType()), + tag.WorkflowTaskListName(taskList.GetName()), + ) + return cadence_errors.NewTaskListNotOwnedByHostError( + taskListOwner.Identity(), + self.Identity(), + taskList.GetName(), + ) + } + + return nil +} + +func (e *matchingEngineImpl) isShuttingDown() bool { + select { + case <-e.shutdown: + return true + default: + return false + } +} + func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) { m.Lock() defer m.Unlock() diff --git a/service/matching/handler/engine_integration_test.go b/service/matching/handler/engine_integration_test.go index a3e613c60e8..73c3201a1e9 100644 --- a/service/matching/handler/engine_integration_test.go +++ b/service/matching/handler/engine_integration_test.go @@ -34,7 +34,6 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/uber-go/tally" "go.uber.org/yarpc" @@ -45,7 +44,6 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" - cadence_errors "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/isolationgroup/defaultisolationgroupstate" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -55,6 +53,7 @@ import ( "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -131,6 +130,7 @@ func (s *matchingEngineSuite) SetupTest() { s.mockMembershipResolver = membership.NewMockResolver(s.controller) s.mockMembershipResolver.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(membership.HostInfo{}, nil).AnyTimes() s.mockMembershipResolver.EXPECT().WhoAmI().Return(membership.HostInfo{}, nil).AnyTimes() + s.mockMembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() s.mockIsolationStore = dynamicconfig.NewMockClient(s.controller) dcClient := dynamicconfig.NewInMemoryClient() dcClient.UpdateValue(dynamicconfig.EnableTasklistIsolation, true) @@ -1303,58 +1303,6 @@ func (s *matchingEngineSuite) TestConfigDefaultHostName() { s.EqualValues(configEmpty.HostName, "") } -func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() { - testCases := []struct { - name string - lookUpResult string - lookUpErr error - whoAmIResult string - whoAmIErr error - - expectedError error - }{ - { - name: "Not owned by current host", - lookUpResult: "A", - whoAmIResult: "B", - expectedError: new(cadence_errors.TaskListNotOwnnedByHostError), - }, - { - name: "LookupError", - lookUpErr: assert.AnError, - expectedError: assert.AnError, - }, - { - name: "WhoAmIError", - whoAmIErr: assert.AnError, - expectedError: assert.AnError, - }, - } - - for _, tc := range testCases { - s.T().Run(tc.name, func(t *testing.T) { - resolverMock := membership.NewMockResolver(s.controller) - s.matchingEngine.membershipResolver = resolverMock - - resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( - membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, - ).AnyTimes() - resolverMock.EXPECT().WhoAmI().Return( - membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, - ).AnyTimes() - - taskListKind := types.TaskListKindNormal - - _, err := s.matchingEngine.getTaskListManager( - tasklist.NewTestTaskListID(s.T(), "domain", "tasklist", persistence.TaskListTypeActivity), - &taskListKind, - ) - - assert.ErrorAs(s.T(), err, &tc.expectedError) - }) - } -} - func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64, scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent { historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled) @@ -1402,6 +1350,7 @@ func defaultTestConfig() *config.Config { config.GetTasksBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(10) config.AsyncTaskDispatchTimeout = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond) config.MaxTimeBetweenTaskDeletes = time.Duration(0) + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { return true } return config } diff --git a/service/matching/handler/engine_test.go b/service/matching/handler/engine_test.go index 87c84088362..66004d08c84 100644 --- a/service/matching/handler/engine_test.go +++ b/service/matching/handler/engine_test.go @@ -26,6 +26,7 @@ import ( "context" "errors" "fmt" + "sync" "testing" "github.com/golang/mock/gomock" @@ -36,8 +37,10 @@ import ( "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/matching/config" "github.com/uber/cadence/service/matching/tasklist" @@ -617,3 +620,108 @@ func TestWaitForQueryResult(t *testing.T) { }) } } + +func TestIsShuttingDown(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(0) + e := matchingEngineImpl{ + shutdownCompletion: &wg, + shutdown: make(chan struct{}), + } + e.Start() + assert.False(t, e.isShuttingDown()) + e.Stop() + assert.True(t, e.isShuttingDown()) +} + +func TestGetTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + } + + tls, err := e.getNonOwnedTasklistsLocked() + assert.NoError(t, err) + + assert.Equal(t, []tasklist.Manager{tl2m}, tls) +} + +func TestShutDownTasklistsNotOwned(t *testing.T) { + + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + + resolver.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("self", "host123", nil), nil) + + tl1, _ := tasklist.NewIdentifier("", "tl1", 0) + tl2, _ := tasklist.NewIdentifier("", "tl2", 0) + tl3, _ := tasklist.NewIdentifier("", "tl3", 0) + + tl1m := tasklist.NewMockManager(ctrl) + tl2m := tasklist.NewMockManager(ctrl) + tl3m := tasklist.NewMockManager(ctrl) + + resolver.EXPECT().Lookup(service.Matching, tl1.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl2.GetName()).Return(membership.NewDetailedHostInfo("", "host456", nil), nil) + resolver.EXPECT().Lookup(service.Matching, tl3.GetName()).Return(membership.NewDetailedHostInfo("", "host123", nil), nil) + + e := matchingEngineImpl{ + shutdown: make(chan struct{}), + membershipResolver: resolver, + taskListsLock: sync.RWMutex{}, + taskLists: map[tasklist.Identifier]tasklist.Manager{ + *tl1: tl1m, + *tl2: tl2m, + *tl3: tl3m, + }, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + metricsClient: metrics.NewNoopMetricsClient(), + logger: loggerimpl.NewNopLogger(), + } + + wg := sync.WaitGroup{} + + wg.Add(1) + + tl2m.EXPECT().TaskListID().Return(tl2).AnyTimes() + tl2m.EXPECT().String().AnyTimes() + + tl2m.EXPECT().Stop().Do(func() { + wg.Done() + }) + + err := e.shutDownNonOwnedTasklists() + wg.Wait() + + assert.NoError(t, err) +} diff --git a/service/matching/handler/interfaces.go b/service/matching/handler/interfaces.go index 4c82fa334b3..21f39f160e0 100644 --- a/service/matching/handler/interfaces.go +++ b/service/matching/handler/interfaces.go @@ -34,7 +34,8 @@ import ( type ( // Engine exposes interfaces for clients to poll for activity and decision tasks. Engine interface { - Stop() + common.Daemon + AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) diff --git a/service/matching/handler/interfaces_mock.go b/service/matching/handler/interfaces_mock.go index e94141e872f..78401a46838 100644 --- a/service/matching/handler/interfaces_mock.go +++ b/service/matching/handler/interfaces_mock.go @@ -206,6 +206,18 @@ func (mr *MockEngineMockRecorder) RespondQueryTaskCompleted(hCtx, request interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondQueryTaskCompleted", reflect.TypeOf((*MockEngine)(nil).RespondQueryTaskCompleted), hCtx, request) } +// Start mocks base method. +func (m *MockEngine) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start. +func (mr *MockEngineMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockEngine)(nil).Start)) +} + // Stop mocks base method. func (m *MockEngine) Stop() { m.ctrl.T.Helper() diff --git a/service/matching/handler/membership.go b/service/matching/handler/membership.go new file mode 100644 index 00000000000..88964d19dd7 --- /dev/null +++ b/service/matching/handler/membership.go @@ -0,0 +1,150 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "fmt" + "sync" + + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/service/matching/tasklist" +) + +const subscriptionBufferSize = 1000 + +// Because there's a bunch of conditions under which matching may be holding a tasklist +// reader daemon and other live procesess but when it doesn't (according to the rest of the hashring) +// own the tasklist anymore, this listener watches for membership changes and purges anything disused +// in the hashring on membership changes. +// +// Combined with the guard on tasklist instantiation, it should prevent incorrect or poorly timed +// creating of tasklist ownership and database shard thrashing between hosts while they figure out +// which host is the real owner of the tasklist. +// +// This is not the main shutdown process, its just an optimization. +func (e *matchingEngineImpl) subscribeToMembershipChanges() { + defer func() { + if r := recover(); r != nil { + e.logger.Error("matching membership watcher changes caused a panic, recovering", tag.Dynamic("recovered-panic", r)) + } + }() + + defer e.shutdownCompletion.Done() + + if !e.config.EnableTasklistOwnershipGuard() { + return + } + + listener := make(chan *membership.ChangedEvent, subscriptionBufferSize) + e.membershipResolver.Subscribe(service.Matching, "matching-engine", listener) + + for { + select { + case event := <-listener: + err := e.shutDownNonOwnedTasklists() + if err != nil { + e.logger.Error("Error while trying to determine if tasklists have been shutdown", + tag.Error(err), + tag.MembershipChangeEvent(event), + ) + } + case <-e.shutdown: + return + } + } +} + +func (e *matchingEngineImpl) shutDownNonOwnedTasklists() error { + if !e.config.EnableTasklistOwnershipGuard() { + return nil + } + noLongerOwned, err := e.getNonOwnedTasklistsLocked() + if err != nil { + return err + } + + tasklistsShutdownWG := sync.WaitGroup{} + + for _, tl := range noLongerOwned { + // for each of the tasklists that are no longer owned, kick off the + // process of stopping them. The stopping process is IO heavy and + // can take a while, so do them in parallel to efficiently unload tasklists not owned + tasklistsShutdownWG.Add(1) + go func(tl tasklist.Manager) { + + defer func() { + if r := recover(); r != nil { + e.logger.Error("panic occurred while trying to shut down tasklist", tag.Dynamic("recovered-panic", r)) + } + }() + defer tasklistsShutdownWG.Done() + + e.logger.Info("shutting down tasklist preemptively because they are no longer owned by this host", + tag.WorkflowTaskListType(tl.TaskListID().GetType()), + tag.WorkflowTaskListName(tl.TaskListID().GetName()), + tag.WorkflowDomainID(tl.TaskListID().GetDomainID()), + tag.Dynamic("tasklist-debug-info", tl.String()), + ) + + e.unloadTaskList(tl) + }(tl) + } + + tasklistsShutdownWG.Wait() + + return nil +} + +func (e *matchingEngineImpl) getNonOwnedTasklistsLocked() ([]tasklist.Manager, error) { + if !e.config.EnableTasklistOwnershipGuard() { + return nil, nil + } + + var toShutDown []tasklist.Manager + + e.taskListsLock.RLock() + defer e.taskListsLock.RUnlock() + + self, err := e.membershipResolver.WhoAmI() + if err != nil { + return nil, fmt.Errorf("failed to lookup self im membership: %w", err) + } + + for tl, manager := range e.taskLists { + taskListOwner, err := e.membershipResolver.Lookup(service.Matching, tl.GetName()) + if err != nil { + return nil, fmt.Errorf("failed to lookup task list owner: %w", err) + } + + if taskListOwner.Identity() != self.Identity() { + toShutDown = append(toShutDown, manager) + } + } + + e.logger.Info("Got list of non-owned-tasklists", + tag.Dynamic("tasklist-debug-info", toShutDown), + ) + return toShutDown, nil +} diff --git a/service/matching/handler/membership_test.go b/service/matching/handler/membership_test.go new file mode 100644 index 00000000000..2fcdf50d5d8 --- /dev/null +++ b/service/matching/handler/membership_test.go @@ -0,0 +1,307 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package handler + +import ( + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + + "github.com/uber/cadence/client/history" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/dynamicconfig" + cadence_errors "github.com/uber/cadence/common/errors" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/membership" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/matching/config" + "github.com/uber/cadence/service/matching/tasklist" +) + +func TestGetTaskListManager_OwnerShip(t *testing.T) { + + testCases := []struct { + name string + lookUpResult string + lookUpErr error + whoAmIResult string + whoAmIErr error + tasklistGuardEnabled bool + + expectedError error + }{ + { + name: "Not owned by current host", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: true, + + expectedError: new(cadence_errors.TaskListNotOwnedByHostError), + }, + { + name: "LookupError", + lookUpErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "WhoAmIError", + whoAmIErr: assert.AnError, + tasklistGuardEnabled: true, + expectedError: assert.AnError, + }, + { + name: "when feature is not enabled, expect previous behaviour to continue", + lookUpResult: "A", + whoAmIResult: "B", + tasklistGuardEnabled: false, + + expectedError: nil, + }, + } + + for _, tc := range testCases { + + t.Run(tc.name, func(t *testing.T) { + + ctrl := gomock.NewController(t) + logger := loggerimpl.NewNopLogger() + + mockTimeSource := clock.NewMockedTimeSourceAt(time.Now()) + taskManager := tasklist.NewTestTaskManager(t, logger, mockTimeSource) + mockHistoryClient := history.NewMockClient(ctrl) + mockDomainCache := cache.NewMockDomainCache(ctrl) + resolverMock := membership.NewMockResolver(ctrl) + resolverMock.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).AnyTimes() + + // this is only if the call goes through + mockDomainCache.EXPECT().GetDomainByID(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomain(gomock.Any()).Return(cache.CreateDomainCacheEntry(matchingTestDomainName), nil).AnyTimes() + mockDomainCache.EXPECT().GetDomainName(gomock.Any()).Return(matchingTestDomainName, nil).AnyTimes() + + config := defaultTestConfig() + taskListEnabled := tc.tasklistGuardEnabled + config.EnableTasklistOwnershipGuard = func(opts ...dynamicconfig.FilterOption) bool { + return taskListEnabled + } + + matchingEngine := NewEngine( + taskManager, + cluster.GetTestClusterMetadata(true), + mockHistoryClient, + nil, + config, + logger, + metrics.NewClient(tally.NoopScope, metrics.Matching), + mockDomainCache, + resolverMock, + nil, + mockTimeSource, + ).(*matchingEngineImpl) + + resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return( + membership.NewDetailedHostInfo("", tc.lookUpResult, make(membership.PortMap)), tc.lookUpErr, + ).AnyTimes() + resolverMock.EXPECT().WhoAmI().Return( + membership.NewDetailedHostInfo("", tc.whoAmIResult, make(membership.PortMap)), tc.whoAmIErr, + ).AnyTimes() + + taskListKind := types.TaskListKindNormal + + _, err := matchingEngine.getTaskListManager( + tasklist.NewTestTaskListID(t, "domain", "tasklist", persistence.TaskListTypeActivity), + &taskListKind, + ) + if tc.expectedError != nil { + assert.ErrorAs(t, err, &tc.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMembershipSubscriptionShutdown(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Times(1) + + e := matchingEngineImpl{ + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + go func() { + time.Sleep(time.Second) + close(e.shutdown) + }() + e.subscribeToMembershipChanges() + }) +} + +func TestMembershipSubscriptionPanicHandling(t *testing.T) { + assert.NotPanics(t, func() { + ctrl := gomock.NewController(t) + + r := resource.NewTest(t, ctrl, 0) + r.MembershipResolver.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).DoAndReturn(func(_, _, _ any) { + panic("a panic has occurred") + }) + + e := matchingEngineImpl{ + membershipResolver: r.MembershipResolver, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + logger: loggerimpl.NewNopLogger(), + shutdown: make(chan struct{}), + } + + e.subscribeToMembershipChanges() + }) +} + +func TestSubscriptionAndShutdown(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := &sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // anytimes here because this is quite a racy test and the actual assertions for the unsubscription logic will be separated out + m.EXPECT().WhoAmI().Return(membership.NewDetailedHostInfo("host2", "host2", nil), nil).AnyTimes() + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestSubscriptionAndErrorReturned(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(1) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // this should trigger the error case on a membership event + m.EXPECT().WhoAmI().Return(membership.HostInfo{}, assert.AnError).AnyTimes() + + m.EXPECT().Subscribe(service.Matching, "matching-engine", gomock.Any()).Do( + func(service string, name string, inc chan<- *membership.ChangedEvent) { + m := membership.ChangedEvent{ + HostsAdded: nil, + HostsUpdated: nil, + HostsRemoved: []string{"host123"}, + } + inc <- &m + }) + + go func() { + // then call stop so the test can finish + time.Sleep(time.Second) + e.Stop() + }() + + e.subscribeToMembershipChanges() +} + +func TestGetTasklistManagerShutdownScenario(t *testing.T) { + ctrl := gomock.NewController(t) + m := membership.NewMockResolver(ctrl) + + self := membership.NewDetailedHostInfo("self", "self", nil) + + m.EXPECT().WhoAmI().Return(self, nil).AnyTimes() + + shutdownWG := sync.WaitGroup{} + shutdownWG.Add(0) + + e := matchingEngineImpl{ + shutdownCompletion: &shutdownWG, + membershipResolver: m, + config: &config.Config{ + EnableTasklistOwnershipGuard: func(opts ...dynamicconfig.FilterOption) bool { return true }, + }, + shutdown: make(chan struct{}), + logger: loggerimpl.NewNopLogger(), + } + + // set this engine to be shutting down so as to trigger the tasklistGetTasklistByID guard + e.Stop() + + tl, _ := tasklist.NewIdentifier("domainid", "tl", 0) + kind := types.TaskListKindNormal + res, err := e.getTaskListManager(tl, &kind) + assertErr := &cadence_errors.TaskListNotOwnedByHostError{} + assert.ErrorAs(t, err, &assertErr) + assert.Nil(t, res) +}