Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jan 31, 2018
1 parent 405a4df commit 7e9ae2e
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 156 deletions.
5 changes: 2 additions & 3 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
"log"
"time"

"github.com/uber/cadence/common/persistence"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/service/frontend"
Expand Down Expand Up @@ -105,7 +104,7 @@ func (s *server) startService() common.Daemon {
params.MetricScope = svcCfg.Metrics.NewScope()
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
params.ClusterMetadata = persistence.NewClusterMetadata(
params.ClusterMetadata = cluster.NewMetadata(
s.cfg.ClustersInfo.InitialFailoverVersion,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.CurrentClusterName,
Expand Down
40 changes: 21 additions & 19 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
domainCacheInitialSize = 1024
domainCacheMaxSize = 16 * 1024
domainCacheTTL = time.Hour
domainEntryRefreshInterval = int64(10 * time.Second)
domainEntryRefreshInterval = 10 * time.Second
)

type (
Expand All @@ -61,7 +61,7 @@ type (
Info *persistence.DomainInfo
Config *persistence.DomainConfig
ReplicationConfig *persistence.DomainReplicationConfig
expiry int64
expiry time.Time
sync.RWMutex
}
)
Expand Down Expand Up @@ -109,25 +109,15 @@ func (c *domainCache) GetDomainID(name string) (string, error) {
// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCacheEntry, error) {
now := c.timeSource.Now().UnixNano()
now := c.timeSource.Now()
var result *domainCacheEntry

copyEntry := func(input *domainCacheEntry) *domainCacheEntry {
output := newDomainCacheEntry()
output.Info = input.Info
output.Config = input.Config
output.ReplicationConfig = input.ReplicationConfig
return output
}

isCacheExpired := func(entry *domainCacheEntry) bool { return entry.expiry == 0 || now >= entry.expiry }

entry, cacheHit := cache.Get(key).(*domainCacheEntry)
if cacheHit {
// Found the information in the cache, lets check if it needs to be refreshed before returning back
entry.RLock()
if !isCacheExpired(entry) {
result = copyEntry(entry)
if !entry.isExpired(now) {
result = entry.duplicate()
entry.RUnlock()
return result, nil
}
Expand All @@ -146,15 +136,15 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache
defer entry.Unlock()

// Check again under the lock to make sure someone else did not update the entry
if isCacheExpired(entry) {
if entry.isExpired(now) {
response, err := c.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: name,
ID: id,
})

// Failed to get domain. Return stale entry if we have one, otherwise just return error
if err != nil {
if entry.expiry > 0 {
if !entry.expiry.IsZero() {
return entry, nil
}

Expand All @@ -164,8 +154,20 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache
entry.Info = response.Info
entry.Config = response.Config
entry.ReplicationConfig = response.ReplicationConfig
entry.expiry = now + domainEntryRefreshInterval
entry.expiry = now.Add(domainEntryRefreshInterval)
}

return copyEntry(entry), nil
return entry.duplicate(), nil
}

func (entry *domainCacheEntry) duplicate() *domainCacheEntry {
result := newDomainCacheEntry()
result.Info = entry.Info
result.Config = entry.Config
result.ReplicationConfig = entry.ReplicationConfig
return result
}

func (entry *domainCacheEntry) isExpired(now time.Time) bool {
return entry.expiry.IsZero() || now.After(entry.expiry)
}
25 changes: 15 additions & 10 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
if c.isEntryExpired(entry, time.Now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
}
}
}

c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
}
return existing, nil
}

entry := &entryImpl{
Expand Down
95 changes: 95 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cluster

type (
// Metadata provides information about clusters
Metadata interface {
GetNextFailoverVersion(int64) int64
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterNames return the all cluster names, as a set
GetAllClusterNames() map[string]bool
}

MetadataImpl struct {
// initialFailoverVersion is the initial failover version
initialFailoverVersion int64
// failoverVersionIncrement is the increment of each cluster failover version
failoverVersionIncrement int64
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterNames contains all cluster names, as a set
clusterNames map[string]bool
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(initialFailoverVersion int64, failoverVersionIncrement int64,
currentClusterName string, clusterNames []string) Metadata {

if initialFailoverVersion < 0 {
panic("Bad initial failover version")
} else if failoverVersionIncrement <= initialFailoverVersion {
panic("Bad failover version increment")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(clusterNames) == 0 {
panic("Total number of all cluster names is 0")
}

clusters := make(map[string]bool)
for _, clusterName := range clusterNames {
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
clusters[clusterName] = true
}
if _, ok := clusters[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
}

return &MetadataImpl{
initialFailoverVersion: initialFailoverVersion,
failoverVersionIncrement: failoverVersionIncrement,
currentClusterName: currentClusterName,
clusterNames: clusters,
}
}

// GetNextFailoverVersion return the next failover version based on input
func (metadata *MetadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 {
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement + metadata.initialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
return failoverVersion
}

// GetCurrentClusterName return the current cluster name
func (metadata *MetadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterNames return the all cluster names
func (metadata *MetadataImpl) GetAllClusterNames() map[string]bool {
return metadata.clusterNames
}
56 changes: 30 additions & 26 deletions common/persistence/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ const (

type (
cassandraMetadataPersistence struct {
session *gocql.Session
clusterMetadata ClusterMetadata
logger bark.Logger
session *gocql.Session
currentClusterName string
logger bark.Logger
}
)

// NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation
func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string,
clusterMetadata ClusterMetadata, logger bark.Logger) (MetadataManager,
currentClusterName string, logger bark.Logger) (MetadataManager,
error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
Expand All @@ -110,9 +110,9 @@ func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc
}

return &cassandraMetadataPersistence{
session: session,
clusterMetadata: clusterMetadata,
logger: logger,
session: session,
currentClusterName: currentClusterName,
logger: logger,
}, nil
}

Expand All @@ -128,11 +128,6 @@ func (m *cassandraMetadataPersistence) Close() {
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the
// orphaned entry from domains table. We might need a background job to delete those orphaned record.
func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest) (*CreateDomainResponse, error) {
clusterReplicationConfigs := []map[string]interface{}{}
for index := range request.ReplicationConfig.Clusters {
clusterReplicationConfigs = append(clusterReplicationConfigs, request.ReplicationConfig.Clusters[index].serialize())
}

domainUUID := uuid.New()
if err := m.session.Query(templateCreateDomainQuery, domainUUID, request.Name).Exec(); err != nil {
return nil, &workflow.InternalServiceError{
Expand All @@ -151,7 +146,7 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
request.ReplicationConfig.FailoverVersion,
clusterReplicationConfigs,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
)

previous := make(map[string]interface{})
Expand Down Expand Up @@ -248,13 +243,9 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
return nil, handleError(request.Name, request.ID, err)
}

replicationConfig.ActiveClusterName = m.clusterMetadata.GetOrUseDefaultActiveCluster(replicationConfig.ActiveClusterName)
for index := range replicationClusters {
clusterReplicationConfig := &ClusterReplicationConfig{}
clusterReplicationConfig.deserialize(replicationClusters[index])
replicationConfig.Clusters = append(replicationConfig.Clusters, clusterReplicationConfig)
}
replicationConfig.Clusters = m.clusterMetadata.GetOrUseDefaultClusters(replicationConfig.Clusters)
replicationConfig.ActiveClusterName = GetOrUseDefaultActiveCluster(m.currentClusterName, replicationConfig.ActiveClusterName)
replicationConfig.Clusters = deserializeClusterConfigs(replicationClusters)
replicationConfig.Clusters = GetOrUseDefaultClusters(m.currentClusterName, replicationConfig.Clusters)

return &GetDomainResponse{
Info: info,
Expand All @@ -265,11 +256,6 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge
}

func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest) error {
clusterReplicationConfigs := []map[string]interface{}{}
for index := range request.ReplicationConfig.Clusters {
clusterReplicationConfigs = append(clusterReplicationConfigs, request.ReplicationConfig.Clusters[index].serialize())
}

var nextVersion int64 = 1
var currentVersion *int64
if request.Version > 0 {
Expand All @@ -286,7 +272,7 @@ func (m *cassandraMetadataPersistence) UpdateDomain(request *UpdateDomainRequest
request.Config.EmitMetric,
request.ReplicationConfig.ActiveClusterName,
request.ReplicationConfig.FailoverVersion,
clusterReplicationConfigs,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
nextVersion,
request.Info.Name,
currentVersion,
Expand Down Expand Up @@ -345,3 +331,21 @@ func (m *cassandraMetadataPersistence) deleteDomain(name, ID string) error {

return nil
}

func serializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {
seriaizedReplicationConfigs := []map[string]interface{}{}
for index := range replicationConfigs {
seriaizedReplicationConfigs = append(seriaizedReplicationConfigs, replicationConfigs[index].serialize())
}
return seriaizedReplicationConfigs
}

func deserializeClusterConfigs(replicationConfigs []map[string]interface{}) []*ClusterReplicationConfig {
deseriaizedReplicationConfigs := []*ClusterReplicationConfig{}
for index := range replicationConfigs {
deseriaizedReplicationConfig := &ClusterReplicationConfig{}
deseriaizedReplicationConfig.deserialize(replicationConfigs[index])
deseriaizedReplicationConfigs = append(deseriaizedReplicationConfigs, deseriaizedReplicationConfig)
}
return deseriaizedReplicationConfigs
}
Loading

0 comments on commit 7e9ae2e

Please sign in to comment.