diff --git a/PULL_REQUEST_TEMPLATE.md b/.github/pull_request_template.md similarity index 100% rename from PULL_REQUEST_TEMPLATE.md rename to .github/pull_request_template.md diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 4fc638b6eab..56f956d1aaf 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -227,6 +227,7 @@ func NewDomainCacheEntryForTest( isGlobalDomain bool, repConfig *persistence.DomainReplicationConfig, failoverVersion int64, + failoverEndtime *int64, clusterMetadata cluster.Metadata, ) *DomainCacheEntry { @@ -236,6 +237,7 @@ func NewDomainCacheEntryForTest( isGlobalDomain: isGlobalDomain, replicationConfig: repConfig, failoverVersion: failoverVersion, + failoverEndTime: failoverEndtime, clusterMetadata: clusterMetadata, } } @@ -794,6 +796,11 @@ func (entry *DomainCacheEntry) IsDomainPendingActive() bool { return entry.failoverEndTime != nil } +// GetDomainFailoverEndTime returns domain failover end time if it exists +func (entry *DomainCacheEntry) GetDomainFailoverEndTime() *int64 { + return entry.failoverEndTime +} + // GetReplicationPolicy return the derived workflow replication policy func (entry *DomainCacheEntry) GetReplicationPolicy() ReplicationPolicy { // frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1 diff --git a/common/domain/failover_watcher.go b/common/domain/failover_watcher.go new file mode 100644 index 00000000000..1abac30ec94 --- /dev/null +++ b/common/domain/failover_watcher.go @@ -0,0 +1,225 @@ +// Copyright (c) 2017-2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination failover_watcher_mock.go + +package domain + +import ( + "sync/atomic" + "time" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +const ( + updateDomainRetryInitialInterval = 50 * time.Millisecond + updateDomainRetryCoefficient = 2.0 + updateDomainMaxRetry = 3 +) + +type ( + // FailoverWatcher handles failover operation on domain entities + FailoverWatcher interface { + common.Daemon + } + + failoverWatcherImpl struct { + status int32 + shutdownChan chan struct{} + refreshInterval dynamicconfig.DurationPropertyFn + refreshJitter dynamicconfig.FloatPropertyFn + retryPolicy backoff.RetryPolicy + + metadataMgr persistence.MetadataManager + domainCache cache.DomainCache + timeSource clock.TimeSource + metrics metrics.Client + logger log.Logger + } +) + +var _ FailoverWatcher = (*failoverWatcherImpl)(nil) + +// NewFailoverWatcher initializes domain failover processor +func NewFailoverWatcher( + domainCache cache.DomainCache, + metadataMgr persistence.MetadataManager, + timeSource clock.TimeSource, + refreshInterval dynamicconfig.DurationPropertyFn, + refreshJitter dynamicconfig.FloatPropertyFn, + metrics metrics.Client, + logger log.Logger, +) FailoverWatcher { + + retryPolicy := &backoff.ExponentialRetryPolicy{} + retryPolicy.SetInitialInterval(updateDomainRetryInitialInterval) + retryPolicy.SetBackoffCoefficient(updateDomainRetryCoefficient) + retryPolicy.SetMaximumAttempts(updateDomainMaxRetry) + + return &failoverWatcherImpl{ + status: common.DaemonStatusInitialized, + shutdownChan: make(chan struct{}), + refreshInterval: refreshInterval, + refreshJitter: refreshJitter, + retryPolicy: retryPolicy, + domainCache: domainCache, + metadataMgr: metadataMgr, + timeSource: timeSource, + metrics: metrics, + logger: logger, + } +} + +func (p *failoverWatcherImpl) Start() { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { + return + } + + go p.refreshDomainLoop() + + p.logger.Info("Domain failover processor started.") +} + +func (p *failoverWatcherImpl) Stop() { + if !atomic.CompareAndSwapInt32(&p.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { + return + } + + close(p.shutdownChan) + p.logger.Info("Domain failover processor stop.") +} + +func (p *failoverWatcherImpl) refreshDomainLoop() { + + timer := time.NewTimer(backoff.JitDuration( + p.refreshInterval(), + p.refreshJitter(), + )) + defer timer.Stop() + + for { + select { + case <-p.shutdownChan: + return + case <-timer.C: + domains := p.domainCache.GetAllDomain() + for _, domain := range domains { + p.handleFailoverTimeout(domain) + select { + case <-p.shutdownChan: + p.logger.Debug("Stop refresh domain as the processing is stopping.") + return + default: + } + } + + timer.Reset(backoff.JitDuration( + p.refreshInterval(), + p.refreshJitter(), + )) + } + } +} + +func (p *failoverWatcherImpl) handleFailoverTimeout( + domain *cache.DomainCacheEntry, +) { + + failoverEndTime := domain.GetDomainFailoverEndTime() + if failoverEndTime != nil && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) { + domainName := domain.GetInfo().Name + // force failover the domain without setting the failover timeout + if err := CleanPendingActiveState( + p.metadataMgr, + domainName, + domain.GetFailoverVersion(), + p.retryPolicy, + ); err != nil { + p.metrics.IncCounter(metrics.DomainFailoverScope, metrics.CadenceFailures) + p.logger.Error("Failed to update pending-active domain to active.", tag.WorkflowDomainID(domainName), tag.Error(err)) + } + } +} + +// CleanPendingActiveState removes the pending active state from the domain +func CleanPendingActiveState( + metadataMgr persistence.MetadataManager, + domainName string, + failoverVersion int64, + policy backoff.RetryPolicy, +) error { + + // must get the metadata (notificationVersion) first + // this version can be regarded as the lock on the v2 domain table + // and since we do not know which table will return the domain afterwards + // this call has to be made + metadata, err := metadataMgr.GetMetadata() + if err != nil { + return err + } + notificationVersion := metadata.NotificationVersion + getResponse, err := metadataMgr.GetDomain(&persistence.GetDomainRequest{Name: domainName}) + if err != nil { + return err + } + localFailoverVersion := getResponse.FailoverVersion + isGlobalDomain := getResponse.IsGlobalDomain + gracefulFailoverEndTime := getResponse.FailoverEndTime + + if isGlobalDomain && gracefulFailoverEndTime != nil && failoverVersion == localFailoverVersion { + // if the domain is still pending active and the failover versions are the same, clean the state + updateReq := &persistence.UpdateDomainRequest{ + Info: getResponse.Info, + Config: getResponse.Config, + ReplicationConfig: getResponse.ReplicationConfig, + ConfigVersion: getResponse.ConfigVersion, + FailoverVersion: localFailoverVersion, + FailoverNotificationVersion: getResponse.FailoverNotificationVersion, + FailoverEndTime: nil, + NotificationVersion: notificationVersion, + } + op := func() error { + return metadataMgr.UpdateDomain(updateReq) + } + if err := backoff.Retry( + op, + policy, + isUpdateDomainRetryable, + ); err != nil { + return err + } + } + return nil +} + +func isUpdateDomainRetryable( + err error, +) bool { + return true +} diff --git a/common/domain/failover_watcher_mock.go b/common/domain/failover_watcher_mock.go new file mode 100644 index 00000000000..593a2ef8190 --- /dev/null +++ b/common/domain/failover_watcher_mock.go @@ -0,0 +1,81 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: failover_watcher.go + +// Package domain is a generated GoMock package. +package domain + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockFailoverWatcher is a mock of FailoverWatcher interface +type MockFailoverWatcher struct { + ctrl *gomock.Controller + recorder *MockFailoverWatcherMockRecorder +} + +// MockFailoverWatcherMockRecorder is the mock recorder for MockFailoverWatcher +type MockFailoverWatcherMockRecorder struct { + mock *MockFailoverWatcher +} + +// NewMockFailoverWatcher creates a new mock instance +func NewMockFailoverWatcher(ctrl *gomock.Controller) *MockFailoverWatcher { + mock := &MockFailoverWatcher{ctrl: ctrl} + mock.recorder = &MockFailoverWatcherMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockFailoverWatcher) EXPECT() *MockFailoverWatcherMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockFailoverWatcher) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockFailoverWatcherMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockFailoverWatcher)(nil).Start)) +} + +// Stop mocks base method +func (m *MockFailoverWatcher) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop +func (mr *MockFailoverWatcherMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockFailoverWatcher)(nil).Stop)) +} diff --git a/common/domain/failover_watcher_test.go b/common/domain/failover_watcher_test.go new file mode 100644 index 00000000000..58cd7eb87a7 --- /dev/null +++ b/common/domain/failover_watcher_test.go @@ -0,0 +1,237 @@ +// Copyright (c) 2017-2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package domain + +import ( + "log" + "os" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" + persistencetests "github.com/uber/cadence/common/persistence/persistence-tests" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/common/service/dynamicconfig" +) + +type ( + failoverWatcherSuite struct { + suite.Suite + persistencetests.TestBase + + *require.Assertions + controller *gomock.Controller + + mockResource *resource.Test + mockDomainCache *cache.MockDomainCache + timeSource clock.TimeSource + metadataMgr persistence.MetadataManager + watcher *failoverWatcherImpl + } +) + +func TestFailoverWatcherSuite(t *testing.T) { + s := new(failoverWatcherSuite) + suite.Run(t, s) +} + +func (s *failoverWatcherSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } + + s.TestBase = persistencetests.NewTestBaseWithCassandra(&persistencetests.TestBaseOptions{ + ClusterMetadata: cluster.GetTestClusterMetadata(true, true), + }) + s.TestBase.Setup() +} + +func (s *failoverWatcherSuite) TearDownSuite() { + s.TestBase.TearDownWorkflowStore() +} + +func (s *failoverWatcherSuite) SetupTest() { + s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) + + s.mockResource = resource.NewTest(s.controller, metrics.DomainFailoverScope) + s.mockDomainCache = s.mockResource.DomainCache + s.metadataMgr = s.TestBase.MetadataManager + s.timeSource = s.mockResource.GetTimeSource() + s.watcher = NewFailoverWatcher( + s.mockDomainCache, + s.metadataMgr, + s.timeSource, + dynamicconfig.GetDurationPropertyFn(10*time.Second), + dynamicconfig.GetFloatPropertyFn(0.2), + s.mockResource.GetMetricsClient(), + s.mockResource.GetLogger(), + ).(*failoverWatcherImpl) +} + +func (s *failoverWatcherSuite) TearDownTest() { + s.controller.Finish() + s.mockResource.Finish(s.T()) + s.watcher.Stop() +} + +func (s *failoverWatcherSuite) TestCleanPendingActiveState() { + domainName := uuid.New() + info := &persistence.DomainInfo{ + ID: domainName, + Name: domainName, + Status: persistence.DomainStatusRegistered, + Description: "some random description", + OwnerEmail: "some random email", + Data: nil, + } + domainConfig := &persistence.DomainConfig{ + Retention: 1, + EmitMetric: true, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + Clusters: []*persistence.ClusterReplicationConfig{ + { + s.ClusterMetadata.GetCurrentClusterName(), + }, + }, + } + + _, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 1, + }) + s.NoError(err) + + // does not have failover end time + err = CleanPendingActiveState(s.metadataMgr, domainName, 1, s.watcher.retryPolicy) + s.NoError(err) + + metadata, err := s.metadataMgr.GetMetadata() + s.NoError(err) + notificationVersion := metadata.NotificationVersion + err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: notificationVersion, + FailoverEndTime: common.Int64Ptr(2), + NotificationVersion: notificationVersion, + }) + s.NoError(err) + + // does not have failover version + err = CleanPendingActiveState(s.metadataMgr, domainName, 5, s.watcher.retryPolicy) + s.NoError(err) + + err = CleanPendingActiveState(s.metadataMgr, domainName, 2, s.watcher.retryPolicy) + s.NoError(err) + + resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{ + Name: domainName, + }) + s.NoError(err) + s.True(resp.FailoverEndTime == nil) +} + +func (s *failoverWatcherSuite) TestHandleFailoverTimeout() { + domainName := uuid.New() + info := &persistence.DomainInfo{ + ID: domainName, + Name: domainName, + Status: persistence.DomainStatusRegistered, + Description: "some random description", + OwnerEmail: "some random email", + Data: nil, + } + domainConfig := &persistence.DomainConfig{ + Retention: 1, + EmitMetric: true, + } + replicationConfig := &persistence.DomainReplicationConfig{ + ActiveClusterName: s.ClusterMetadata.GetCurrentClusterName(), + Clusters: []*persistence.ClusterReplicationConfig{ + { + s.ClusterMetadata.GetCurrentClusterName(), + }, + }, + } + + _, err := s.metadataMgr.CreateDomain(&persistence.CreateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + IsGlobalDomain: true, + ConfigVersion: 1, + FailoverVersion: 1, + }) + s.NoError(err) + + metadata, err := s.metadataMgr.GetMetadata() + s.NoError(err) + notificationVersion := metadata.NotificationVersion + + endtime := common.Int64Ptr(s.timeSource.Now().UnixNano() - 1) + err = s.metadataMgr.UpdateDomain(&persistence.UpdateDomainRequest{ + Info: info, + Config: domainConfig, + ReplicationConfig: replicationConfig, + ConfigVersion: 1, + FailoverVersion: 2, + FailoverNotificationVersion: notificationVersion, + FailoverEndTime: endtime, + NotificationVersion: notificationVersion, + }) + s.NoError(err) + domainEntry := cache.NewDomainCacheEntryForTest( + info, + domainConfig, + true, + replicationConfig, + 2, + endtime, + s.ClusterMetadata, + ) + s.watcher.handleFailoverTimeout(domainEntry) + + resp, err := s.metadataMgr.GetDomain(&persistence.GetDomainRequest{ + Name: domainName, + }) + s.NoError(err) + s.True(resp.FailoverEndTime == nil) +} diff --git a/common/domain/handler.go b/common/domain/handler.go index ebd806df34e..e8b7e13eb82 100644 --- a/common/domain/handler.go +++ b/common/domain/handler.go @@ -299,7 +299,7 @@ func (d *HandlerImpl) ListDomains( IsGlobalDomain: common.BoolPtr(domain.IsGlobalDomain), FailoverVersion: common.Int64Ptr(domain.FailoverVersion), } - desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = d.createResponse(ctx, domain.Info, domain.Config, domain.ReplicationConfig) + desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = d.createResponse(domain.Info, domain.Config, domain.ReplicationConfig) domains = append(domains, desc) } @@ -331,7 +331,7 @@ func (d *HandlerImpl) DescribeDomain( IsGlobalDomain: common.BoolPtr(resp.IsGlobalDomain), FailoverVersion: common.Int64Ptr(resp.FailoverVersion), } - response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(ctx, resp.Info, resp.Config, resp.ReplicationConfig) + response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(resp.Info, resp.Config, resp.ReplicationConfig) return response, nil } @@ -529,7 +529,7 @@ func (d *HandlerImpl) UpdateDomain( IsGlobalDomain: common.BoolPtr(isGlobalDomain), FailoverVersion: common.Int64Ptr(failoverVersion), } - response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(ctx, info, config, replicationConfig) + response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, replicationConfig) d.logger.Info("Update domain succeeded", tag.WorkflowDomainName(info.Name), @@ -583,7 +583,6 @@ func (d *HandlerImpl) DeprecateDomain( } func (d *HandlerImpl) createResponse( - ctx context.Context, info *persistence.DomainInfo, config *persistence.DomainConfig, replicationConfig *persistence.DomainReplicationConfig, diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 6411bcf3b09..e3e3fe5b9eb 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -640,6 +640,9 @@ const ( // BlobstoreClientDirectoryExistsScope tracks DirectoryExists calls to blobstore BlobstoreClientDirectoryExistsScope + // DomainFailoverScope is used in domain failover processor + DomainFailoverScope + NumCommonScopes ) @@ -1305,6 +1308,8 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ BlobstoreClientExistsScope: {operation: "BlobstoreClientExists", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}}, BlobstoreClientDeleteScope: {operation: "BlobstoreClientDelete", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}}, BlobstoreClientDirectoryExistsScope: {operation: "BlobstoreClientDirectoryExists", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}}, + + DomainFailoverScope: {operation: "DomainFailover"}, }, // Frontend Scope Names Frontend: { diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index eb62a3b89b6..1b6eba19be9 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -62,6 +62,7 @@ var keys = map[Key]string{ VisibilityArchivalStatus: "system.visibilityArchivalStatus", EnableReadFromVisibilityArchival: "system.enableReadFromVisibilityArchival", EnableDomainNotActiveAutoForwarding: "system.enableDomainNotActiveAutoForwarding", + EnableGracefulFailover: "system.enableGracefulFailover", TransactionSizeLimit: "system.transactionSizeLimit", MinRetentionDays: "system.minRetentionDays", MaxDecisionStartToCloseSeconds: "system.maxDecisionStartToCloseSeconds", @@ -82,32 +83,34 @@ var keys = map[Key]string{ MaxIDLengthLimit: "limit.maxIDLength", // frontend settings - FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS", - FrontendPersistenceGlobalMaxQPS: "frontend.persistenceGlobalMaxQPS", - FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize", - FrontendVisibilityListMaxQPS: "frontend.visibilityListMaxQPS", - FrontendESVisibilityListMaxQPS: "frontend.esVisibilityListMaxQPS", - FrontendMaxBadBinaries: "frontend.maxBadBinaries", - FrontendESIndexMaxResultWindow: "frontend.esIndexMaxResultWindow", - FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", - FrontendRPS: "frontend.rps", - FrontendMaxDomainRPSPerInstance: "frontend.domainrps", - FrontendGlobalDomainRPS: "frontend.globalDomainrps", - FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", - FrontendShutdownDrainDuration: "frontend.shutdownDrainDuration", - DisableListVisibilityByFilter: "frontend.disableListVisibilityByFilter", - FrontendThrottledLogRPS: "frontend.throttledLogRPS", - EnableClientVersionCheck: "frontend.enableClientVersionCheck", - ValidSearchAttributes: "frontend.validSearchAttributes", - SendRawWorkflowHistory: "frontend.sendRawWorkflowHistory", - FrontendEnableRPCReplication: "frontend.enableRPCReplication", - FrontendEnableCleanupReplicationTask: "frontend.enableCleanupReplicationTask", - SearchAttributesNumberOfKeysLimit: "frontend.searchAttributesNumberOfKeysLimit", - SearchAttributesSizeOfValueLimit: "frontend.searchAttributesSizeOfValueLimit", - SearchAttributesTotalSizeLimit: "frontend.searchAttributesTotalSizeLimit", - VisibilityArchivalQueryMaxPageSize: "frontend.visibilityArchivalQueryMaxPageSize", - VisibilityArchivalQueryMaxRangeInDays: "frontend.visibilityArchivalQueryMaxRangeInDays", - VisibilityArchivalQueryMaxQPS: "frontend.visibilityArchivalQueryMaxQPS", + FrontendPersistenceMaxQPS: "frontend.persistenceMaxQPS", + FrontendPersistenceGlobalMaxQPS: "frontend.persistenceGlobalMaxQPS", + FrontendVisibilityMaxPageSize: "frontend.visibilityMaxPageSize", + FrontendVisibilityListMaxQPS: "frontend.visibilityListMaxQPS", + FrontendESVisibilityListMaxQPS: "frontend.esVisibilityListMaxQPS", + FrontendMaxBadBinaries: "frontend.maxBadBinaries", + FrontendESIndexMaxResultWindow: "frontend.esIndexMaxResultWindow", + FrontendHistoryMaxPageSize: "frontend.historyMaxPageSize", + FrontendRPS: "frontend.rps", + FrontendMaxDomainRPSPerInstance: "frontend.domainrps", + FrontendGlobalDomainRPS: "frontend.globalDomainrps", + FrontendHistoryMgrNumConns: "frontend.historyMgrNumConns", + FrontendShutdownDrainDuration: "frontend.shutdownDrainDuration", + DisableListVisibilityByFilter: "frontend.disableListVisibilityByFilter", + FrontendThrottledLogRPS: "frontend.throttledLogRPS", + EnableClientVersionCheck: "frontend.enableClientVersionCheck", + ValidSearchAttributes: "frontend.validSearchAttributes", + SendRawWorkflowHistory: "frontend.sendRawWorkflowHistory", + FrontendEnableRPCReplication: "frontend.enableRPCReplication", + FrontendEnableCleanupReplicationTask: "frontend.enableCleanupReplicationTask", + SearchAttributesNumberOfKeysLimit: "frontend.searchAttributesNumberOfKeysLimit", + SearchAttributesSizeOfValueLimit: "frontend.searchAttributesSizeOfValueLimit", + SearchAttributesTotalSizeLimit: "frontend.searchAttributesTotalSizeLimit", + VisibilityArchivalQueryMaxPageSize: "frontend.visibilityArchivalQueryMaxPageSize", + VisibilityArchivalQueryMaxRangeInDays: "frontend.visibilityArchivalQueryMaxRangeInDays", + VisibilityArchivalQueryMaxQPS: "frontend.visibilityArchivalQueryMaxQPS", + DomainFailoverRefreshInterval: "frontend.domainFailoverRefreshInterval", + DomainFailoverRefreshTimerJitterCoefficient: "frontend.domainFailoverRefreshTimerJitterCoefficient", // matching settings MatchingRPS: "matching.rps", @@ -333,6 +336,8 @@ const ( // EnableDomainNotActiveAutoForwarding whether enabling DC auto forwarding to active cluster // for signal / start / signal with start API if domain is not active EnableDomainNotActiveAutoForwarding + // EnableGracefulFailover whether enabling graceful failover + EnableGracefulFailover // TransactionSizeLimit is the largest allowed transaction size to persistence TransactionSizeLimit // MinRetentionDays is the minimal allowed retention days for domain @@ -417,6 +422,11 @@ const ( // VisibilityArchivalQueryMaxQPS is the timeout for a visibility archival query VisibilityArchivalQueryMaxQPS + // DomainFailoverRefreshInterval is the domain failover refresh timer + DomainFailoverRefreshInterval + // DomainFailoverRefreshTimerJitterCoefficient is the jitter for domain failover refresh timer jitter + DomainFailoverRefreshTimerJitterCoefficient + // key for matching // MatchingRPS is request rate per second for each matching host diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index bd7bc1b9e46..2e86fbb1db9 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -63,6 +63,7 @@ type ( params *service.BootstrapParams config *Config domainDLQHandler domain.DLQMessageHandler + domainFailoverWatcher domain.FailoverWatcher eventSerializder persistence.PayloadSerializer } @@ -105,6 +106,15 @@ func NewAdminHandler( resource.GetDomainReplicationQueue(), resource.GetLogger(), ), + domainFailoverWatcher: domain.NewFailoverWatcher( + resource.GetDomainCache(), + resource.GetMetadataManager(), + resource.GetTimeSource(), + config.DomainFailoverRefreshInterval, + config.DomainFailoverRefreshTimerJitterCoefficient, + resource.GetMetricsClient(), + resource.GetLogger(), + ), eventSerializder: persistence.NewPayloadSerializer(), } } @@ -121,12 +131,17 @@ func (adh *AdminHandler) Start() { // If the queue does not start, we can still call stop() adh.Resource.GetDomainReplicationQueue().Start() } + + if adh.config.EnableGracefulFailover() { + adh.domainFailoverWatcher.Start() + } } // Stop stops the handler func (adh *AdminHandler) Stop() { // Calling stop if the queue does not start is ok adh.Resource.GetDomainReplicationQueue().Stop() + adh.domainFailoverWatcher.Stop() } // AddSearchAttribute add search attribute to whitelist diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index 6c468feef87..9ed870e0b8c 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -94,6 +94,7 @@ func (s *adminHandlerSuite) SetupTest() { config := &Config{ EnableAdminProtection: dynamicconfig.GetBoolPropertyFn(false), EnableCleanupReplicationTask: dynamicconfig.GetBoolPropertyFn(false), + EnableGracefulFailover: dynamicconfig.GetBoolPropertyFn(false), } s.handler = NewAdminHandler(s.mockResource, params, config) s.handler.Start() diff --git a/service/frontend/service.go b/service/frontend/service.go index ef0d6727349..301268ecef6 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -81,7 +81,10 @@ type Config struct { ThrottledLogRPS dynamicconfig.IntPropertyFn // Domain specific config - EnableDomainNotActiveAutoForwarding dynamicconfig.BoolPropertyFnWithDomainFilter + EnableDomainNotActiveAutoForwarding dynamicconfig.BoolPropertyFnWithDomainFilter + EnableGracefulFailover dynamicconfig.BoolPropertyFn + DomainFailoverRefreshInterval dynamicconfig.DurationPropertyFn + DomainFailoverRefreshTimerJitterCoefficient dynamicconfig.FloatPropertyFn // ValidSearchAttributes is legal indexed keys that can be used in list APIs ValidSearchAttributes dynamicconfig.MapPropertyFn @@ -101,42 +104,45 @@ type Config struct { // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFromES bool) *Config { return &Config{ - NumHistoryShards: numHistoryShards, - PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000), - PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceGlobalMaxQPS, 0), - VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), - EnableVisibilitySampling: dc.GetBoolProperty(dynamicconfig.EnableVisibilitySampling, true), - EnableReadFromClosedExecutionV2: dc.GetBoolProperty(dynamicconfig.EnableReadFromClosedExecutionV2, false), - VisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityListMaxQPS, 1), - EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), - ESVisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendESVisibilityListMaxQPS, 3), - ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000), - HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), - RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200), - MaxDomainRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainRPSPerInstance, 1200), - GlobalDomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainRPS, 0), - MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), - HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), - MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries), - EnableAdminProtection: dc.GetBoolProperty(dynamicconfig.EnableAdminProtection, false), - AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken), - DisableListVisibilityByFilter: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisableListVisibilityByFilter, false), - BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024), - BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitWarn, 256*1024), - ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.FrontendThrottledLogRPS, 20), - ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownDrainDuration, 0), - EnableDomainNotActiveAutoForwarding: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableDomainNotActiveAutoForwarding, true), - EnableClientVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableClientVersionCheck, false), - ValidSearchAttributes: dc.GetMapProperty(dynamicconfig.ValidSearchAttributes, definition.GetDefaultIndexedKeys()), - SearchAttributesNumberOfKeysLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesNumberOfKeysLimit, 100), - SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024), - SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), - MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.MinRetentionDays), - VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000), - DisallowQuery: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisallowQuery, false), - SendRawWorkflowHistory: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.SendRawWorkflowHistory, false), - EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false), - EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true), + NumHistoryShards: numHistoryShards, + PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000), + PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceGlobalMaxQPS, 0), + VisibilityMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityMaxPageSize, 1000), + EnableVisibilitySampling: dc.GetBoolProperty(dynamicconfig.EnableVisibilitySampling, true), + EnableReadFromClosedExecutionV2: dc.GetBoolProperty(dynamicconfig.EnableReadFromClosedExecutionV2, false), + VisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendVisibilityListMaxQPS, 1), + EnableReadVisibilityFromES: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableReadVisibilityFromES, enableReadFromES), + ESVisibilityListMaxQPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendESVisibilityListMaxQPS, 3), + ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000), + HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), + RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200), + MaxDomainRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainRPSPerInstance, 1200), + GlobalDomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainRPS, 0), + MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), + HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10), + MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, domain.MaxBadBinaries), + EnableAdminProtection: dc.GetBoolProperty(dynamicconfig.EnableAdminProtection, false), + AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken), + DisableListVisibilityByFilter: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisableListVisibilityByFilter, false), + BlobSizeLimitError: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitError, 2*1024*1024), + BlobSizeLimitWarn: dc.GetIntPropertyFilteredByDomain(dynamicconfig.BlobSizeLimitWarn, 256*1024), + ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.FrontendThrottledLogRPS, 20), + ShutdownDrainDuration: dc.GetDurationProperty(dynamicconfig.FrontendShutdownDrainDuration, 0), + EnableDomainNotActiveAutoForwarding: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableDomainNotActiveAutoForwarding, true), + EnableGracefulFailover: dc.GetBoolProperty(dynamicconfig.EnableGracefulFailover, false), + DomainFailoverRefreshInterval: dc.GetDurationProperty(dynamicconfig.DomainFailoverRefreshInterval, 10*time.Second), + DomainFailoverRefreshTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.DomainFailoverRefreshTimerJitterCoefficient, 0.1), + EnableClientVersionCheck: dc.GetBoolProperty(dynamicconfig.EnableClientVersionCheck, false), + ValidSearchAttributes: dc.GetMapProperty(dynamicconfig.ValidSearchAttributes, definition.GetDefaultIndexedKeys()), + SearchAttributesNumberOfKeysLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesNumberOfKeysLimit, 100), + SearchAttributesSizeOfValueLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesSizeOfValueLimit, 2*1024), + SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), + MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.MinRetentionDays), + VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000), + DisallowQuery: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.DisallowQuery, false), + SendRawWorkflowHistory: dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.SendRawWorkflowHistory, false), + EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.FrontendEnableRPCReplication, false), + EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.FrontendEnableCleanupReplicationTask, true), } } diff --git a/service/history/execution/mutable_state_builder_test.go b/service/history/execution/mutable_state_builder_test.go index 6ddbea74583..9dc1a6315bd 100644 --- a/service/history/execution/mutable_state_builder_test.go +++ b/service/history/execution/mutable_state_builder_test.go @@ -709,6 +709,7 @@ func (s *mutableStateSuite) newDomainCacheEntry() *cache.DomainCacheEntry { &persistence.DomainReplicationConfig{}, 1, nil, + nil, ) } diff --git a/service/history/task/timer_task_executor_base_test.go b/service/history/task/timer_task_executor_base_test.go index 56033f10107..fa8201dc05b 100644 --- a/service/history/task/timer_task_executor_base_test.go +++ b/service/history/task/timer_task_executor_base_test.go @@ -157,7 +157,15 @@ func (s *timerQueueTaskExecutorBaseSuite) TestArchiveHistory_NoErr_InlineArchiva HistoryArchivedInline: false, }, nil) - domainCacheEntry := cache.NewDomainCacheEntryForTest(&persistence.DomainInfo{}, &persistence.DomainConfig{}, false, nil, 0, nil) + domainCacheEntry := cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{}, + &persistence.DomainConfig{}, + false, + nil, + 0, + nil, + nil, + ) err := s.timerQueueTaskExecutorBase.archiveWorkflow(&persistence.TimerTaskInfo{}, s.mockWorkflowExecutionContext, s.mockMutableState, domainCacheEntry) s.NoError(err) } @@ -175,7 +183,15 @@ func (s *timerQueueTaskExecutorBaseSuite) TestArchiveHistory_SendSignalErr() { return req.CallerService == common.HistoryServiceName && !req.AttemptArchiveInline && req.ArchiveRequest.Targets[0] == archiver.ArchiveTargetHistory })).Return(nil, errors.New("failed to send signal")) - domainCacheEntry := cache.NewDomainCacheEntryForTest(&persistence.DomainInfo{}, &persistence.DomainConfig{}, false, nil, 0, nil) + domainCacheEntry := cache.NewDomainCacheEntryForTest( + &persistence.DomainInfo{}, + &persistence.DomainConfig{}, + false, + nil, + 0, + nil, + nil, + ) err := s.timerQueueTaskExecutorBase.archiveWorkflow(&persistence.TimerTaskInfo{}, s.mockWorkflowExecutionContext, s.mockMutableState, domainCacheEntry) s.Error(err) } diff --git a/service/worker/scanner/executions/common/blobstoreIterator.go b/service/worker/scanner/executions/common/blobstoreIterator.go index 2c5382b53b6..21b83dc9dfa 100644 --- a/service/worker/scanner/executions/common/blobstoreIterator.go +++ b/service/worker/scanner/executions/common/blobstoreIterator.go @@ -41,22 +41,22 @@ type ( func NewBlobstoreIterator( client blobstore.Client, keys Keys, -) ExecutionIterator { +) ScanOutputIterator { return &blobstoreIterator{ itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys)), } } -// Next returns the next Execution -func (i *blobstoreIterator) Next() (*Execution, error) { +// Next returns the next ScanOutputEntity +func (i *blobstoreIterator) Next() (*ScanOutputEntity, error) { exec, err := i.itr.Next() if exec != nil { - return exec.(*Execution), err + return exec.(*ScanOutputEntity), err } return nil, err } -// HasNext returns true if there is a next Execution false otherwise +// HasNext returns true if there is a next ScanOutputEntity false otherwise func (i *blobstoreIterator) HasNext() bool { return i.itr.HasNext() } @@ -90,7 +90,7 @@ func getBlobstoreFetchPageFn( if err := ValidateExecution(&soe.Execution); err != nil { return pagination.Page{}, err } - executions = append(executions, &soe.Execution) + executions = append(executions, &soe) } var nextPageToken interface{} = index + 1 if nextPageToken.(int) > keys.MaxPage { diff --git a/service/worker/scanner/executions/common/blobstoreWriter.go b/service/worker/scanner/executions/common/blobstoreWriter.go index 87e6bf54a62..387a80edff9 100644 --- a/service/worker/scanner/executions/common/blobstoreWriter.go +++ b/service/worker/scanner/executions/common/blobstoreWriter.go @@ -36,14 +36,14 @@ type ( blobstoreWriter struct { writer pagination.Writer uuid string - extension string + extension Extension } ) // NewBlobstoreWriter constructs a new blobstore writer func NewBlobstoreWriter( uuid string, - extension string, + extension Extension, client blobstore.Client, flushThreshold int, ) ExecutionWriter { @@ -84,7 +84,7 @@ func (bw *blobstoreWriter) FlushedKeys() *Keys { func getBlobstoreWriteFn( uuid string, - extension string, + extension Extension, client blobstore.Client, ) pagination.WriteFn { return func(page pagination.Page) (pagination.PageToken, error) { @@ -123,6 +123,6 @@ func getBlobstoreShouldFlushFn( } } -func pageNumberToKey(uuid string, extension string, pageNum int) string { +func pageNumberToKey(uuid string, extension Extension, pageNum int) string { return fmt.Sprintf("%v_%v.%v", uuid, pageNum, extension) } diff --git a/service/worker/scanner/executions/common/interfaces.go b/service/worker/scanner/executions/common/interfaces.go index f2a47898d79..3721d1b4aa5 100644 --- a/service/worker/scanner/executions/common/interfaces.go +++ b/service/worker/scanner/executions/common/interfaces.go @@ -59,7 +59,17 @@ type ( // or converting store entry to Execution will result in an error after which iterator cannot be used. Next() (*Execution, error) // HasNext indicates if the iterator has a next element. If HasNext is true - // it is guaranteed that Next will return a nil error and a non-nil ExecutionIteratorResult. + // it is guaranteed that Next will return a nil error and a non-nil Execution. + HasNext() bool + } + + // ScanOutputIterator gets ScanOutputEntities from underlying store + ScanOutputIterator interface { + // Next returns the next ScanOutputEntity found. Any error reading from underlying store + // or converting store entry to ScanOutputEntity will result in an error after which iterator cannot be used. + Next() (*ScanOutputEntity, error) + // HasNext indicates if the iterator has a next element. If HasNext is true it is + // guaranteed that Next will return a nil error and non-nil ScanOutputEntity. HasNext() bool } diff --git a/service/worker/scanner/executions/common/mocks.go b/service/worker/scanner/executions/common/mocks.go index ec63e1b4a15..2135d465848 100644 --- a/service/worker/scanner/executions/common/mocks.go +++ b/service/worker/scanner/executions/common/mocks.go @@ -325,6 +325,58 @@ func (mr *MockExecutionIteratorMockRecorder) HasNext() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasNext", reflect.TypeOf((*MockExecutionIterator)(nil).HasNext)) } +// MockScanOutputIterator is a mock of ScanOutputIterator interface +type MockScanOutputIterator struct { + ctrl *gomock.Controller + recorder *MockScanOutputIteratorMockRecorder +} + +// MockScanOutputIteratorMockRecorder is the mock recorder for MockScanOutputIterator +type MockScanOutputIteratorMockRecorder struct { + mock *MockScanOutputIterator +} + +// NewMockScanOutputIterator creates a new mock instance +func NewMockScanOutputIterator(ctrl *gomock.Controller) *MockScanOutputIterator { + mock := &MockScanOutputIterator{ctrl: ctrl} + mock.recorder = &MockScanOutputIteratorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockScanOutputIterator) EXPECT() *MockScanOutputIteratorMockRecorder { + return m.recorder +} + +// Next mocks base method +func (m *MockScanOutputIterator) Next() (*ScanOutputEntity, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Next") + ret0, _ := ret[0].(*ScanOutputEntity) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Next indicates an expected call of Next +func (mr *MockScanOutputIteratorMockRecorder) Next() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*MockScanOutputIterator)(nil).Next)) +} + +// HasNext mocks base method +func (m *MockScanOutputIterator) HasNext() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HasNext") + ret0, _ := ret[0].(bool) + return ret0 +} + +// HasNext indicates an expected call of HasNext +func (mr *MockScanOutputIteratorMockRecorder) HasNext() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasNext", reflect.TypeOf((*MockScanOutputIterator)(nil).HasNext)) +} + // MockExecutionWriter is a mock of ExecutionWriter interface type MockExecutionWriter struct { ctrl *gomock.Controller diff --git a/service/worker/scanner/executions/common/types.go b/service/worker/scanner/executions/common/types.go index 1b0483030fe..4d5c4c9fbee 100644 --- a/service/worker/scanner/executions/common/types.go +++ b/service/worker/scanner/executions/common/types.go @@ -52,37 +52,37 @@ const ( // CheckResultTypeFailed indicates a failure occurred while attempting to run check CheckResultTypeFailed CheckResultType = "failed" // CheckResultTypeCorrupted indicates check successfully ran and detected a corruption - CheckResultTypeCorrupted = "corrupted" + CheckResultTypeCorrupted CheckResultType = "corrupted" // CheckResultTypeHealthy indicates check successfully ran and detected no corruption - CheckResultTypeHealthy = "healthy" + CheckResultTypeHealthy CheckResultType = "healthy" // FixResultTypeSkipped indicates that fix skipped execution FixResultTypeSkipped FixResultType = "skipped" // FixResultTypeFixed indicates that fix successfully fixed an execution - FixResultTypeFixed = "fixed" + FixResultTypeFixed FixResultType = "fixed" // FixResultTypeFailed indicates that fix attempted to fix an execution but failed to do so - FixResultTypeFailed = "failed" + FixResultTypeFailed FixResultType = "failed" // HistoryExistsInvariantType asserts that history must exist if concrete execution exists HistoryExistsInvariantType InvariantType = "history_exists" // ValidFirstEventInvariantType asserts that the first event in a history must be of a specific form - ValidFirstEventInvariantType = "valid_first_event" + ValidFirstEventInvariantType InvariantType = "valid_first_event" // OpenCurrentExecutionInvariantType asserts that an open concrete execution must have a valid current execution - OpenCurrentExecutionInvariantType = "open_current_execution" + OpenCurrentExecutionInvariantType InvariantType = "open_current_execution" // InvariantCollectionMutableState is the collection of invariants relating to mutable state - InvariantCollectionMutableState InvariantCollection = iota + InvariantCollectionMutableState InvariantCollection = 0 // InvariantCollectionHistory is the collection of invariants relating to history - InvariantCollectionHistory + InvariantCollectionHistory InvariantCollection = 1 // SkippedExtension is the extension for files which contain skips SkippedExtension Extension = "skipped" // FailedExtension is the extension for files which contain failures - FailedExtension = "failed" + FailedExtension Extension = "failed" // FixedExtension is the extension for files which contain fixes - FixedExtension = "fixed" + FixedExtension Extension = "fixed" // CorruptedExtension is the extension for files which contain corruptions - CorruptedExtension = "corrupted" + CorruptedExtension Extension = "corrupted" ) // The following are types related to Invariant. @@ -212,7 +212,7 @@ type ( UUID string MinPage int MaxPage int - Extension string + Extension Extension } ) diff --git a/service/worker/scanner/executions/common/writerIterator_test.go b/service/worker/scanner/executions/common/writerIterator_test.go index 4cf9f1447fe..ec46f1af2af 100644 --- a/service/worker/scanner/executions/common/writerIterator_test.go +++ b/service/worker/scanner/executions/common/writerIterator_test.go @@ -62,7 +62,7 @@ func (s *WriterIteratorSuite) TestWriterIterator() { pr := NewPersistenceRetryer(getMockExecutionManager(10, 10), nil) pItr := NewPersistenceIterator(pr, executionPageSize, testShardID) uuid := "uuid" - extension := "test" + extension := Extension("test") outputDir, err := ioutil.TempDir("", "TestWriterIterator") s.NoError(err) defer os.RemoveAll(outputDir) @@ -72,18 +72,18 @@ func (s *WriterIteratorSuite) TestWriterIterator() { blobstore, err := filestore.NewFilestoreClient(cfg) s.NoError(err) blobstoreWriter := NewBlobstoreWriter(uuid, extension, blobstore, 10) - var executions []*Execution + var outputs []*ScanOutputEntity for pItr.HasNext() { exec, err := pItr.Next() s.NoError(err) - executions = append(executions, exec) - err = blobstoreWriter.Add(&ScanOutputEntity{ + soe := &ScanOutputEntity{ Execution: *exec, - }) - s.NoError(err) + } + outputs = append(outputs, soe) + s.NoError(blobstoreWriter.Add(soe)) } s.NoError(blobstoreWriter.Flush()) - s.Len(executions, 100) + s.Len(outputs, 100) s.False(pItr.HasNext()) _, err = pItr.Next() s.Equal(pagination.ErrIteratorFinished, err) @@ -91,13 +91,13 @@ func (s *WriterIteratorSuite) TestWriterIterator() { s.Equal(uuid, flushedKeys.UUID) s.Equal(0, flushedKeys.MinPage) s.Equal(9, flushedKeys.MaxPage) - s.Equal("test", flushedKeys.Extension) + s.Equal(Extension("test"), flushedKeys.Extension) blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys) i := 0 for blobstoreItr.HasNext() { exec, err := blobstoreItr.Next() s.NoError(err) - s.Equal(*executions[i], *exec) + s.Equal(*outputs[i], *exec) i++ } } diff --git a/service/worker/scanner/executions/shard/fixer.go b/service/worker/scanner/executions/shard/fixer.go new file mode 100644 index 00000000000..6f2e04cad85 --- /dev/null +++ b/service/worker/scanner/executions/shard/fixer.go @@ -0,0 +1,146 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package shard + +import ( + "fmt" + + "github.com/pborman/uuid" + + "github.com/uber/cadence/common/blobstore" + "github.com/uber/cadence/service/worker/scanner/executions/common" + "github.com/uber/cadence/service/worker/scanner/executions/invariants" +) + +type ( + fixer struct { + shardID int + itr common.ScanOutputIterator + skippedWriter common.ExecutionWriter + failedWriter common.ExecutionWriter + fixedWriter common.ExecutionWriter + invariantManager common.InvariantManager + } +) + +// NewFixer constructs a new fixer +func NewFixer( + shardID int, + pr common.PersistenceRetryer, + blobstoreClient blobstore.Client, + keys common.Keys, + blobstoreFlushThreshold int, + invariantCollections []common.InvariantCollection, +) common.Fixer { + id := uuid.New() + return &fixer{ + shardID: shardID, + itr: common.NewBlobstoreIterator(blobstoreClient, keys), + skippedWriter: common.NewBlobstoreWriter(id, common.SkippedExtension, blobstoreClient, blobstoreFlushThreshold), + failedWriter: common.NewBlobstoreWriter(id, common.FailedExtension, blobstoreClient, blobstoreFlushThreshold), + fixedWriter: common.NewBlobstoreWriter(id, common.FixedExtension, blobstoreClient, blobstoreFlushThreshold), + invariantManager: invariants.NewInvariantManager(invariantCollections, pr), + } +} + +// Fix scans over all executions in shard and runs invariant fixes per execution. +func (f *fixer) Fix() common.ShardFixReport { + result := common.ShardFixReport{ + ShardID: f.shardID, + } + for f.itr.HasNext() { + soe, err := f.itr.Next() + if err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "blobstore iterator returned error", + InfoDetails: err.Error(), + } + return result + } + fixResult := f.invariantManager.RunFixes(soe.Execution) + result.Stats.ExecutionCount++ + foe := common.FixOutputEntity{ + Execution: soe.Execution, + Input: *soe, + Result: fixResult, + } + switch fixResult.FixResultType { + case common.FixResultTypeFixed: + if err := f.fixedWriter.Add(foe); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "blobstore add failed for fixed execution fix", + InfoDetails: err.Error(), + } + return result + } + result.Stats.FixedCount++ + case common.FixResultTypeSkipped: + if err := f.skippedWriter.Add(foe); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "blobstore add failed for skipped execution fix", + InfoDetails: err.Error(), + } + return result + } + result.Stats.SkippedCount++ + case common.FixResultTypeFailed: + if err := f.failedWriter.Add(foe); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "blobstore add failed for failed execution fix", + InfoDetails: err.Error(), + } + return result + } + result.Stats.FailedCount++ + default: + panic(fmt.Sprintf("unknown FixResultType: %v", fixResult.FixResultType)) + } + } + if err := f.fixedWriter.Flush(); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "failed to flush for fixed execution fixes", + InfoDetails: err.Error(), + } + return result + } + if err := f.skippedWriter.Flush(); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "failed to flush for skipped execution fixes", + InfoDetails: err.Error(), + } + return result + } + if err := f.failedWriter.Flush(); err != nil { + result.Result.ControlFlowFailure = &common.ControlFlowFailure{ + Info: "failed to flush for failed execution fixes", + InfoDetails: err.Error(), + } + return result + } + result.Result.ShardFixKeys = &common.ShardFixKeys{ + Fixed: f.fixedWriter.FlushedKeys(), + Failed: f.failedWriter.FlushedKeys(), + Skipped: f.skippedWriter.FlushedKeys(), + } + return result +} diff --git a/service/worker/scanner/executions/shard/fixer_test.go b/service/worker/scanner/executions/shard/fixer_test.go new file mode 100644 index 00000000000..3a241532314 --- /dev/null +++ b/service/worker/scanner/executions/shard/fixer_test.go @@ -0,0 +1,569 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package shard + +import ( + "errors" + "fmt" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/service/worker/scanner/executions/common" +) + +type FixerSuite struct { + *require.Assertions + suite.Suite + controller *gomock.Controller +} + +func TestFixerSuite(t *testing.T) { + suite.Run(t, new(FixerSuite)) +} + +func (s *FixerSuite) SetupTest() { + s.Assertions = require.New(s.T()) + s.controller = gomock.NewController(s.T()) +} + +func (s *FixerSuite) TearDownTest() { + s.controller.Finish() +} + +func (s *FixerSuite) TestFix_Failure_FirstIteratorError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(true).Times(1) + mockItr.EXPECT().Next().Return(nil, errors.New("iterator error")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "blobstore iterator returned error", + InfoDetails: "iterator error", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_NonFirstError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + iteratorCallNumber := 0 + mockItr.EXPECT().HasNext().DoAndReturn(func() bool { + return iteratorCallNumber < 5 + }).Times(5) + mockItr.EXPECT().Next().DoAndReturn(func() (*common.ScanOutputEntity, error) { + defer func() { + iteratorCallNumber++ + }() + if iteratorCallNumber < 4 { + return &common.ScanOutputEntity{}, nil + } + return nil, fmt.Errorf("iterator got error on: %v", iteratorCallNumber) + }).Times(5) + mockInvariantManager := common.NewMockInvariantManager(s.controller) + mockInvariantManager.EXPECT().RunFixes(gomock.Any()).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + }).Times(4) + fixedWriter := common.NewMockExecutionWriter(s.controller) + fixedWriter.EXPECT().Add(gomock.Any()).Return(nil).Times(4) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + invariantManager: mockInvariantManager, + fixedWriter: fixedWriter, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Stats: common.ShardFixStats{ + ExecutionCount: 4, + FixedCount: 4, + }, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "blobstore iterator returned error", + InfoDetails: "iterator got error on: 4", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_SkippedWriterError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(true).Times(1) + mockItr.EXPECT().Next().Return(&common.ScanOutputEntity{}, nil).Times(1) + mockInvariantManager := common.NewMockInvariantManager(s.controller) + mockInvariantManager.EXPECT().RunFixes(gomock.Any()).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeSkipped, + }).Times(1) + skippedWriter := common.NewMockExecutionWriter(s.controller) + skippedWriter.EXPECT().Add(gomock.Any()).Return(errors.New("skipped writer error")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + skippedWriter: skippedWriter, + invariantManager: mockInvariantManager, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Stats: common.ShardFixStats{ + ExecutionCount: 1, + }, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "blobstore add failed for skipped execution fix", + InfoDetails: "skipped writer error", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_FailedWriterError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(true).Times(1) + mockItr.EXPECT().Next().Return(&common.ScanOutputEntity{}, nil).Times(1) + mockInvariantManager := common.NewMockInvariantManager(s.controller) + mockInvariantManager.EXPECT().RunFixes(gomock.Any()).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFailed, + }).Times(1) + failedWriter := common.NewMockExecutionWriter(s.controller) + failedWriter.EXPECT().Add(gomock.Any()).Return(errors.New("failed writer error")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + failedWriter: failedWriter, + invariantManager: mockInvariantManager, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Stats: common.ShardFixStats{ + ExecutionCount: 1, + }, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "blobstore add failed for failed execution fix", + InfoDetails: "failed writer error", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_FixedWriterError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(true).Times(1) + mockItr.EXPECT().Next().Return(&common.ScanOutputEntity{}, nil).Times(1) + mockInvariantManager := common.NewMockInvariantManager(s.controller) + mockInvariantManager.EXPECT().RunFixes(gomock.Any()).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + }).Times(1) + fixedWriter := common.NewMockExecutionWriter(s.controller) + fixedWriter.EXPECT().Add(gomock.Any()).Return(errors.New("fixed writer error")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + fixedWriter: fixedWriter, + invariantManager: mockInvariantManager, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Stats: common.ShardFixStats{ + ExecutionCount: 1, + }, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "blobstore add failed for fixed execution fix", + InfoDetails: "fixed writer error", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_FixedWriterFlushError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(false).Times(1) + fixedWriter := common.NewMockExecutionWriter(s.controller) + fixedWriter.EXPECT().Flush().Return(errors.New("fix writer flush failed")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + fixedWriter: fixedWriter, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "failed to flush for fixed execution fixes", + InfoDetails: "fix writer flush failed", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_SkippedWriterFlushError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(false).Times(1) + fixedWriter := common.NewMockExecutionWriter(s.controller) + fixedWriter.EXPECT().Flush().Return(nil) + skippedWriter := common.NewMockExecutionWriter(s.controller) + skippedWriter.EXPECT().Flush().Return(errors.New("skip writer flush failed")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + fixedWriter: fixedWriter, + skippedWriter: skippedWriter, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "failed to flush for skipped execution fixes", + InfoDetails: "skip writer flush failed", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Failure_FailedWriterFlushError() { + mockItr := common.NewMockScanOutputIterator(s.controller) + mockItr.EXPECT().HasNext().Return(false).Times(1) + fixedWriter := common.NewMockExecutionWriter(s.controller) + fixedWriter.EXPECT().Flush().Return(nil) + skippedWriter := common.NewMockExecutionWriter(s.controller) + skippedWriter.EXPECT().Flush().Return(nil).Times(1) + failedWriter := common.NewMockExecutionWriter(s.controller) + failedWriter.EXPECT().Flush().Return(errors.New("fail writer flush failed")).Times(1) + fixer := &fixer{ + shardID: 0, + itr: mockItr, + fixedWriter: fixedWriter, + skippedWriter: skippedWriter, + failedWriter: failedWriter, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Result: common.ShardFixResult{ + ControlFlowFailure: &common.ControlFlowFailure{ + Info: "failed to flush for failed execution fixes", + InfoDetails: "fail writer flush failed", + }, + }, + }, result) +} + +func (s *FixerSuite) TestFix_Success() { + mockItr := common.NewMockScanOutputIterator(s.controller) + iteratorCallNumber := 0 + mockItr.EXPECT().HasNext().DoAndReturn(func() bool { + return iteratorCallNumber < 10 + }).Times(11) + mockItr.EXPECT().Next().DoAndReturn(func() (*common.ScanOutputEntity, error) { + defer func() { + iteratorCallNumber++ + }() + switch iteratorCallNumber { + case 0, 1, 2, 3: + return &common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "skipped", + }, + }, nil + case 4, 5: + return &common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "history_missing", + }, + }, nil + case 6: + return &common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "first_history_event", + }, + }, nil + case 7: + return &common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "orphan_execution", + }, + }, nil + case 8, 9: + return &common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "failed", + }, + }, nil + default: + panic("should not get here") + } + }).Times(10) + mockInvariantManager := common.NewMockInvariantManager(s.controller) + mockInvariantManager.EXPECT().RunFixes(common.Execution{ + DomainID: "skipped", + }).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeSkipped, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.ValidFirstEventInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.OpenCurrentExecutionInvariantType, + }, + }, + }).Times(4) + mockInvariantManager.EXPECT().RunFixes(common.Execution{ + DomainID: "history_missing", + }).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.HistoryExistsInvariantType, + Info: "history did not exist", + }, + }, + }).Times(2) + mockInvariantManager.EXPECT().RunFixes(common.Execution{ + DomainID: "first_history_event", + }).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.ValidFirstEventInvariantType, + Info: "first event is not valid", + }, + }, + }).Times(1) + mockInvariantManager.EXPECT().RunFixes(common.Execution{ + DomainID: "orphan_execution", + State: persistence.WorkflowStateCreated, + }).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.ValidFirstEventInvariantType, + }, + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.OpenCurrentExecutionInvariantType, + Info: "execution was orphan", + }, + }, + }).Times(1) + mockInvariantManager.EXPECT().RunFixes(common.Execution{ + DomainID: "failed", + }).Return(common.ManagerFixResult{ + FixResultType: common.FixResultTypeFailed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeFailed, + InvariantType: common.HistoryExistsInvariantType, + Info: "failed to check if history exists", + }, + }, + }).Times(2) + + mockFixedWriter := common.NewMockExecutionWriter(s.controller) + mockFixedWriter.EXPECT().Add(common.FixOutputEntity{ + Execution: common.Execution{ + DomainID: "history_missing", + }, + Input: common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "history_missing", + }, + }, + Result: common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.HistoryExistsInvariantType, + Info: "history did not exist", + }, + }, + }, + }).Times(2) + mockFixedWriter.EXPECT().Add(common.FixOutputEntity{ + Execution: common.Execution{ + DomainID: "first_history_event", + }, + Input: common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "first_history_event", + }, + }, + Result: common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.ValidFirstEventInvariantType, + Info: "first event is not valid", + }, + }, + }, + }).Times(1) + mockFixedWriter.EXPECT().Add(common.FixOutputEntity{ + Execution: common.Execution{ + DomainID: "orphan_execution", + }, + Input: common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "orphan_execution", + }, + }, + Result: common.ManagerFixResult{ + FixResultType: common.FixResultTypeFixed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.ValidFirstEventInvariantType, + }, + { + FixResultType: common.FixResultTypeFixed, + InvariantType: common.OpenCurrentExecutionInvariantType, + Info: "execution was orphan", + }, + }, + }, + }).Times(1) + mockFailedWriter := common.NewMockExecutionWriter(s.controller) + mockFailedWriter.EXPECT().Add(common.FixOutputEntity{ + Execution: common.Execution{ + DomainID: "failed", + }, + Input: common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "failed", + }, + }, + Result: common.ManagerFixResult{ + FixResultType: common.FixResultTypeFailed, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeFailed, + InvariantType: common.HistoryExistsInvariantType, + Info: "failed to check if history exists", + }, + }, + }, + }).Times(2) + mockSkippedWriter := common.NewMockExecutionWriter(s.controller) + mockSkippedWriter.EXPECT().Add(common.FixOutputEntity{ + Execution: common.Execution{ + DomainID: "skipped", + }, + Input: common.ScanOutputEntity{ + Execution: common.Execution{ + DomainID: "skipped", + }, + }, + Result: common.ManagerFixResult{ + FixResultType: common.FixResultTypeSkipped, + FixResults: []common.FixResult{ + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.HistoryExistsInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.ValidFirstEventInvariantType, + }, + { + FixResultType: common.FixResultTypeSkipped, + InvariantType: common.OpenCurrentExecutionInvariantType, + }, + }, + }, + }).Times(4) + mockSkippedWriter.EXPECT().Flush().Return(nil) + mockFailedWriter.EXPECT().Flush().Return(nil) + mockFixedWriter.EXPECT().Flush().Return(nil) + mockSkippedWriter.EXPECT().FlushedKeys().Return(&common.Keys{UUID: "skipped_keys_uuid"}) + mockFailedWriter.EXPECT().FlushedKeys().Return(&common.Keys{UUID: "failed_keys_uuid"}) + mockFixedWriter.EXPECT().FlushedKeys().Return(&common.Keys{UUID: "fixed_keys_uuid"}) + + fixer := &fixer{ + shardID: 0, + invariantManager: mockInvariantManager, + skippedWriter: mockSkippedWriter, + failedWriter: mockFailedWriter, + fixedWriter: mockFixedWriter, + itr: mockItr, + } + result := fixer.Fix() + s.Equal(common.ShardFixReport{ + ShardID: 0, + Stats: common.ShardFixStats{ + ExecutionCount: 10, + FixedCount: 4, + SkippedCount: 4, + FailedCount: 2, + }, + Result: common.ShardFixResult{ + ShardFixKeys: &common.ShardFixKeys{ + Fixed: &common.Keys{UUID: "fixed_keys_uuid"}, + Failed: &common.Keys{UUID: "failed_keys_uuid"}, + Skipped: &common.Keys{UUID: "skipped_keys_uuid"}, + }, + }, + }, result) +} diff --git a/service/worker/service.go b/service/worker/service.go index e6b99028cd4..e76f0e54be3 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -165,7 +165,6 @@ func (s *Service) Start() { if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } - logger := s.GetLogger() logger.Info("worker starting", tag.ComponentWorker)