Skip to content

Commit

Permalink
Bug fixes for failover (cadence-workflow#3415)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 4, 2021
1 parent df93b3f commit b218e28
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 12 deletions.
5 changes: 0 additions & 5 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,6 @@ func (entry *DomainCacheEntry) IsDomainPendingActive() bool {
return entry.failoverEndTime != nil
}

// GetDomainFailoverEndTime returns domain failover end time if it exists
func (entry *DomainCacheEntry) GetDomainFailoverEndTime() *int64 {
return entry.failoverEndTime
}

// GetReplicationPolicy return the derived workflow replication policy
func (entry *DomainCacheEntry) GetReplicationPolicy() ReplicationPolicy {
// frontend guarantee that the clusters always contains the active domain, so if the # of clusters is 1
Expand Down
4 changes: 2 additions & 2 deletions common/domain/failover_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ func (p *failoverWatcherImpl) handleFailoverTimeout(
domain *cache.DomainCacheEntry,
) {

failoverEndTime := domain.GetDomainFailoverEndTime()
if failoverEndTime != nil && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) {
failoverEndTime := domain.GetFailoverEndTime()
if domain.IsDomainPendingActive() && p.timeSource.Now().After(time.Unix(0, *failoverEndTime)) {
domainID := domain.GetInfo().ID
// force failover the domain without setting the failover timeout
if err := CleanPendingActiveState(
Expand Down
1 change: 1 addition & 0 deletions common/errors/domainNotActiveError.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ func NewDomainPendingActiveError(domainName string, currentCluster string) *work
),
DomainName: domainName,
CurrentCluster: currentCluster,
ActiveCluster: currentCluster,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
const (
constDomainPartition = 0
domainMetadataRecordName = "cadence-domain-metadata"
emptyFailoverEndTime = int64(-1)
emptyFailoverEndTime = int64(0)
)

const (
Expand Down Expand Up @@ -435,7 +435,8 @@ func (m *cassandraMetadataPersistenceV2) GetDomain(request *p.GetDomainRequest)

var responseFailoverEndTime *int64
if failoverEndTime != emptyFailoverEndTime {
responseFailoverEndTime = &failoverEndTime
domainFailoverEndTime := failoverEndTime
responseFailoverEndTime = common.Int64Ptr(domainFailoverEndTime)
}

return &p.InternalGetDomainResponse{
Expand Down Expand Up @@ -515,7 +516,8 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque
domain.ReplicationConfig.Clusters = p.GetOrUseDefaultClusters(m.currentClusterName, domain.ReplicationConfig.Clusters)

if failoverEndTime != emptyFailoverEndTime {
domain.FailoverEndTime = &failoverEndTime
domainFailoverEndTime := failoverEndTime
domain.FailoverEndTime = common.Int64Ptr(domainFailoverEndTime)
}
response.Domains = append(response.Domains, domain)
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/metadataStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (m *metadataManagerImpl) ListDomains(request *ListDomainsRequest) (*ListDom
ConfigVersion: d.ConfigVersion,
FailoverVersion: d.FailoverVersion,
FailoverNotificationVersion: d.FailoverNotificationVersion,
FailoverEndTime: d.FailoverEndTime,
PreviousFailoverVersion: d.PreviousFailoverVersion,
NotificationVersion: d.NotificationVersion,
})
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3787,7 +3787,7 @@ func (wh *WorkflowHandler) checkOngoingFailover(
if failoverVersion == nil {
failoverVersion = resp.FailoverVersion
}
if failoverVersion != resp.FailoverVersion {
if *failoverVersion != resp.GetFailoverVersion() {
return &gen.BadRequestError{
Message: "Concurrent failover is not allow.",
}
Expand Down
4 changes: 4 additions & 0 deletions service/history/failover/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (c *coordinatorImpl) handleFailoverMarkers(
metrics.GracefulFailoverLatency,
now.Sub(time.Unix(0, marker.GetCreationTime())),
)
c.logger.Info("Updated domain from pending-active to active",
tag.WorkflowDomainID(domainID),
tag.FailoverVersion(*marker.FailoverVersion),
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ func (e *historyEngineImpl) registerDomainFailoverCallback() {
e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion) == e.currentClusterName {
failoverMarkerTasks = append(failoverMarkerTasks, &persistence.FailoverMarkerTask{
VisibilityTimestamp: e.timeSource.Now(),
Version: shardNotificationVersion,
Version: nextDomain.GetFailoverVersion(),
DomainID: nextDomain.GetInfo().ID,
})
}
Expand Down
3 changes: 3 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ func (s *contextImpl) AddingPendingFailoverMarker(
}
// domain is active, the marker is expired
if domainEntry.IsDomainActive() || domainEntry.GetFailoverVersion() > marker.GetFailoverVersion() {
s.logger.Info("Skipped out-of-date failover marker", tag.WorkflowDomainName(domainEntry.GetInfo().Name))
return nil
}

Expand All @@ -1314,6 +1315,7 @@ func (s *contextImpl) AddingPendingFailoverMarker(

s.pendingFailoverMarkers = append(s.pendingFailoverMarkers, marker)
if err := s.updateFailoverMarkersInShardInfoLocked(); err != nil {
s.logger.Error("Failed to add failover marker.", tag.Error(err))
return err
}
return s.updateShardInfoLocked()
Expand Down Expand Up @@ -1352,6 +1354,7 @@ func (s *contextImpl) ValidateAndUpdateFailoverMarkers() ([]*replicator.Failover
}
}
if err := s.updateFailoverMarkersInShardInfoLocked(); err != nil {
s.logger.Error("Failed to update failover marker in shard.", tag.Error(err))
return nil, err
}
if err := s.updateShardInfoLocked(); err != nil {
Expand Down

0 comments on commit b218e28

Please sign in to comment.