Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: failover trigger should also notify activer timer / transfer … #886

Merged
merged 2 commits into from
Jun 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
// requests using the stale entry from cache upto an hour
DomainCache interface {
common.Daemon
RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn)
RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn)
UnregisterDomainChangeCallback(shard int)
GetDomain(name string) (*DomainCacheEntry, error)
GetDomainByID(id string) (*DomainCacheEntry, error)
Expand All @@ -89,7 +89,8 @@ type (

sync.RWMutex
domainNotificationVersion int64
callbacks map[int]CallbackFn
beforeCallbacks map[int]CallbackFn
afterCallbacks map[int]CallbackFn
}

// DomainCacheEntries is DomainCacheEntry slice
Expand Down Expand Up @@ -126,7 +127,8 @@ func NewDomainCache(metadataMgr persistence.MetadataManager, clusterMetadata clu
clusterMetadata: clusterMetadata,
timeSource: common.NewRealTimeSource(),
logger: logger,
callbacks: make(map[int]CallbackFn),
beforeCallbacks: make(map[int]CallbackFn),
afterCallbacks: make(map[int]CallbackFn),
}
}

Expand Down Expand Up @@ -178,14 +180,17 @@ func (c *domainCache) GetAllDomain() map[string]*DomainCacheEntry {
}

// RegisterDomainChangeCallback set a domain change callback
// WARN: the callback function will be triggered by domain cache when holding the domain cache lock,
// WARN: the beforeCallback function will be triggered by domain cache when holding the domain cache lock,
// make sure the callback function will not call domain cache again in case of dead lock
func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn) {
// afterCallback will be invoked when NOT holding the domain cache lock.
func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn) {
c.Lock()
defer c.Unlock()

c.callbacks[shard] = fn
c.beforeCallbacks[shard] = beforeCallback
c.afterCallbacks[shard] = afterCallback

// this section is trying to make the shard catch up with domain changes
if c.domainNotificationVersion > initialNotificationVersion {
domains := DomainCacheEntries{}
for _, domain := range c.GetAllDomain() {
Expand All @@ -197,7 +202,8 @@ func (c *domainCache) RegisterDomainChangeCallback(shard int, initialNotificatio
sort.Sort(domains)
for _, domain := range domains {
if domain.notificationVersion >= initialNotificationVersion {
fn(nil, domain)
beforeCallback(nil, domain)
afterCallback(nil, domain)
}
}
}
Expand All @@ -208,7 +214,8 @@ func (c *domainCache) UnregisterDomainChangeCallback(shard int) {
c.Lock()
defer c.Unlock()

delete(c.callbacks, shard)
delete(c.beforeCallbacks, shard)
delete(c.afterCallbacks, shard)
}

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
Expand Down Expand Up @@ -343,9 +350,8 @@ func (c *domainCache) updateIDToDomainCache(id string, record *DomainCacheEntry)
return nil, err
}
entry := elem.(*DomainCacheEntry)
entry.Lock()
defer entry.Unlock()

entry.Lock()
var prevDomain *DomainCacheEntry
triggerCallback := c.clusterMetadata.IsGlobalDomainEnabled() &&
// expiry will be non zero when the entry is initialized / valid
Expand All @@ -368,7 +374,11 @@ func (c *domainCache) updateIDToDomainCache(id string, record *DomainCacheEntry)

nextDomain := entry.duplicate()
if triggerCallback {
c.triggerDomainChangeCallback(prevDomain, nextDomain)
c.triggerDomainBeforeChangeCallback(prevDomain, nextDomain)
}
entry.Unlock()
if triggerCallback {
c.triggerDomainAfterChangeCallback(prevDomain, nextDomain)
}

return nextDomain, nil
Expand Down Expand Up @@ -445,10 +455,18 @@ func (c *domainCache) getDomainByID(id string) (*DomainCacheEntry, error) {
return newEntry, nil
}

func (c *domainCache) triggerDomainChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
func (c *domainCache) triggerDomainBeforeChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
c.RLock()
defer c.RUnlock()
for _, callback := range c.beforeCallbacks {
callback(prevDomain, nextDomain)
}
}

func (c *domainCache) triggerDomainAfterChangeCallback(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
c.RLock()
defer c.RUnlock()
for _, callback := range c.callbacks {
for _, callback := range c.afterCallbacks {
callback(prevDomain, nextDomain)
}
}
Expand Down
6 changes: 3 additions & 3 deletions common/cache/domainCache_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (_m *DomainCacheMock) GetDomainNotificationVersion() int64 {
return r0
}

// RegisterDomainChangeCallback provides a mock function with given fields: shard, initialNotificationVersion, fn
func (_m *DomainCacheMock) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, fn CallbackFn) {
_m.Called(shard, initialNotificationVersion, fn)
// RegisterDomainChangeCallback provides a mock function with given fields: shard, initialNotificationVersion, beforeCallback, afterCallback
func (_m *DomainCacheMock) RegisterDomainChangeCallback(shard int, initialNotificationVersion int64, beforeCallback CallbackFn, afterCallback CallbackFn) {
_m.Called(shard, initialNotificationVersion, beforeCallback, afterCallback)
}

// Start provides a mock function with given fields:
Expand Down
81 changes: 58 additions & 23 deletions common/cache/domainCache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,26 @@ func (s *domainCacheSuite) TestRegisterCallback_CatchUp() {
s.Nil(s.domainCache.refreshDomains())
s.Equal(domainNotificationVersion, s.domainCache.GetDomainNotificationVersion())

entriesNotification := []*DomainCacheEntry{}
entriesNotificationBefore := []*DomainCacheEntry{}
entriesNotificationAfter := []*DomainCacheEntry{}
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(0)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotification = append(entriesNotification, nextDomain)
})
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotificationBefore = append(entriesNotificationBefore, nextDomain)
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Nil(prevDomain)
entriesNotificationAfter = append(entriesNotificationAfter, nextDomain)
},
)

// the order matters here, should be ordered by notification version
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotification)
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotificationBefore)
s.Equal([]*DomainCacheEntry{entry1, entry2}, entriesNotificationAfter)
}

func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
Expand Down Expand Up @@ -377,16 +387,28 @@ func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
entry1New := s.buildEntryFromRecord(domainRecord1New)
domainNotificationVersion++

entriesOld := []*DomainCacheEntry{}
entriesNew := []*DomainCacheEntry{}
entriesOldBefore := []*DomainCacheEntry{}
entriesNewBefore := []*DomainCacheEntry{}
entriesOldAfter := []*DomainCacheEntry{}
entriesNewAfter := []*DomainCacheEntry{}
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(9999999)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOld = append(entriesOld, prevDomain)
entriesNew = append(entriesNew, nextDomain)
})
s.Empty(entriesOld)
s.Empty(entriesNew)
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOldBefore = append(entriesOldBefore, prevDomain)
entriesNewBefore = append(entriesNewBefore, nextDomain)
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
entriesOldAfter = append(entriesOldAfter, prevDomain)
entriesNewAfter = append(entriesNewAfter, nextDomain)
},
)
s.Empty(entriesOldBefore)
s.Empty(entriesNewBefore)
s.Empty(entriesOldAfter)
s.Empty(entriesNewAfter)

s.metadataMgr.On("GetMetadata").Return(&persistence.GetMetadataResponse{NotificationVersion: domainNotificationVersion}, nil).Once()
s.metadataMgr.On("ListDomains", &persistence.ListDomainsRequest{
Expand All @@ -402,8 +424,10 @@ func (s *domainCacheSuite) TestUpdateCache_ListTrigger() {
// the record 1 got updated later, thus a higher notification version.
// making sure notifying from lower to higher version helps the shard to keep track the
// domain change events
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOld)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNew)
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOldBefore)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNewBefore)
s.Equal([]*DomainCacheEntry{entry2Old, entry1Old}, entriesOldAfter)
s.Equal([]*DomainCacheEntry{entry2New, entry1New}, entriesNewAfter)
}

func (s *domainCacheSuite) TestUpdateCache_GetNotTrigger() {
Expand Down Expand Up @@ -442,19 +466,30 @@ func (s *domainCacheSuite) TestUpdateCache_GetNotTrigger() {
s.Nil(err)
s.Equal(entryOld, entry)

callbackInvoked := false
callbackBeforeInvoked := false
callbackAfterInvoked := false
// we are not testing catching up, so make this really large
currentDomainNotificationVersion := int64(9999999)
s.domainCache.RegisterDomainChangeCallback(0, currentDomainNotificationVersion, func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackInvoked = true
})
s.domainCache.RegisterDomainChangeCallback(
0,
currentDomainNotificationVersion,
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackBeforeInvoked = true
},
func(prevDomain *DomainCacheEntry, nextDomain *DomainCacheEntry) {
s.Equal(entryOld, prevDomain)
s.Equal(entryNew, nextDomain)
callbackAfterInvoked = true
},
)

entry, err = s.domainCache.updateIDToDomainCache(domainRecordNew.Info.ID, entryNew)
s.Nil(err)
s.Equal(entryNew, entry)
s.False(callbackInvoked)
s.False(callbackBeforeInvoked)
s.False(callbackAfterInvoked)
}

func (s *domainCacheSuite) TestGetUpdateCache_ConcurrentAccess() {
Expand Down
8 changes: 5 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ const (
ReplicatorMessages = iota + NumCommonMetrics
ReplicatorFailures
ReplicatorLatency
ReplicatorRetryPercentage
)

// MetricDefs record the metrics for all services
Expand Down Expand Up @@ -801,9 +802,10 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
BufferThrottleCounter: {metricName: "buffer.throttle.count"},
},
Worker: {
ReplicatorMessages: {metricName: "replicator.messages"},
ReplicatorFailures: {metricName: "replicator.errors"},
ReplicatorLatency: {metricName: "replicator.latency"},
ReplicatorMessages: {metricName: "replicator.messages"},
ReplicatorFailures: {metricName: "replicator.errors"},
ReplicatorLatency: {metricName: "replicator.latency"},
ReplicatorRetryPercentage: {metricName: "replicator.retry-percentage", metricType: Gauge},
},
}

Expand Down
48 changes: 37 additions & 11 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,26 +194,52 @@ func (e *historyEngineImpl) Stop() {
}

func (e *historyEngineImpl) registerDomainFailoverCallback() {

failoverPredicate := func(nextDomain *cache.DomainCacheEntry, action func()) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
shardNotificationVersion := e.shard.GetDomainNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName

if nextDomain.IsGlobalDomain() &&
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster == e.currentClusterName {
action()
}
}

// first set the failover callback
e.shard.GetDomainCache().RegisterDomainChangeCallback(
e.shard.GetShardID(),
e.shard.GetDomainCache().GetDomainNotificationVersion(),
// before the domain change, this will be invoked when (most of time) domain cache is locked
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion()
shardNotificationVersion := e.shard.GetDomainNotificationVersion()
domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName

e.logger.Infof("Domain Change Event: Shard: %v, Domain: %v, ID: %v, Failover Notification Version: %v, Active Cluster: %v, Shard Domain Notification Version: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID, domainFailoverNotificationVersion, domainActiveCluster, shardNotificationVersion)
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID,
nextDomain.GetFailoverNotificationVersion(), nextDomain.GetReplicationConfig().ActiveClusterName, e.shard.GetDomainNotificationVersion())

if nextDomain.IsGlobalDomain() &&
domainFailoverNotificationVersion >= shardNotificationVersion &&
domainActiveCluster == e.currentClusterName {
domainID := prevDomain.GetInfo().ID
failoverPredicate(nextDomain, func() {
e.logger.Infof("Domain Failover Start: Shard: %v, Domain: %v, ID: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID)

domainID := nextDomain.GetInfo().ID
e.txProcessor.FailoverDomain(domainID)
e.timerProcessor.FailoverDomain(domainID)
}

})
},
// after the domain change, this will be invoked when domain cache is NOT locked
func(prevDomain *cache.DomainCacheEntry, nextDomain *cache.DomainCacheEntry) {
failoverPredicate(nextDomain, func() {
e.logger.Infof("Domain Failover Notify Active: Shard: %v, Domain: %v, ID: %v\n",
e.shard.GetShardID(), nextDomain.GetInfo().Name, nextDomain.GetInfo().ID)

now := e.shard.GetTimeSource().Now()
// the fake tasks will not be actually used, we just need to make sure
// its length > 0 and has correct timestamp, to trkgger a db scan
fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}}
fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{VisibilityTimestamp: now}}
e.txProcessor.NotifyNewTask(e.currentClusterName, now, fakeDecisionTask)
e.timerProcessor.NotifyNewTimers(e.currentClusterName, now, fakeDecisionTimeoutTask)
})
e.shard.UpdateDomainNotificationVersion(nextDomain.GetNotificationVersion() + 1)
},
)
Expand Down
2 changes: 2 additions & 0 deletions service/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func (p *replicationTaskProcessor) getRemainingRetryCount(remainingRetryCount in
numWorker := float64(p.config.ReplicatorConcurrency)
retryPercentage := workerInRetry / numWorker

p.metricsClient.UpdateGauge(metrics.ReplicatorScope, metrics.ReplicatorRetryPercentage, retryPercentage)

min := func(i int64, j int64) int64 {
if i < j {
return i
Expand Down