From b6af1fbef850bcbc2911ffd0423ed3746b2917c0 Mon Sep 17 00:00:00 2001 From: Wenquan Xing Date: Tue, 30 Jan 2018 13:36:34 -0800 Subject: [PATCH] address comments --- cmd/server/server.go | 5 +- common/cache/domainCache.go | 40 ++++---- common/cache/lru.go | 25 +++-- common/cluster/metadata.go | 95 +++++++++++++++++++ .../cassandraMetadataPersistence.go | 18 ++-- common/persistence/clusterMetadata.go | 87 +---------------- common/persistence/persistenceTestBase.go | 7 +- common/service/service.go | 8 +- common/service/serviceinterfaces.go | 4 +- host/integration_test.go | 2 +- host/onebox.go | 5 +- service/frontend/handler.go | 2 +- service/frontend/service.go | 2 +- service/history/service.go | 2 +- 14 files changed, 163 insertions(+), 139 deletions(-) create mode 100644 common/cluster/metadata.go diff --git a/cmd/server/server.go b/cmd/server/server.go index 49daf4281d7..821a45477cf 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -24,9 +24,8 @@ import ( "log" "time" - "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/service/frontend" @@ -105,7 +104,7 @@ func (s *server) startService() common.Daemon { params.MetricScope = svcCfg.Metrics.NewScope() params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger) params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger) - params.ClusterMetadata = persistence.NewClusterMetadata( + params.ClusterMetadata = cluster.NewMetadata( s.cfg.ClustersInfo.InitialFailoverVersion, s.cfg.ClustersInfo.FailoverVersionIncrement, s.cfg.ClustersInfo.CurrentClusterName, diff --git a/common/cache/domainCache.go b/common/cache/domainCache.go index 8ce840d7517..7c2db772c42 100644 --- a/common/cache/domainCache.go +++ b/common/cache/domainCache.go @@ -34,7 +34,7 @@ const ( domainCacheInitialSize = 1024 domainCacheMaxSize = 16 * 1024 domainCacheTTL = time.Hour - domainEntryRefreshInterval = int64(10 * time.Second) + domainEntryRefreshInterval = 10 * time.Second ) type ( @@ -61,7 +61,7 @@ type ( Info *persistence.DomainInfo Config *persistence.DomainConfig ReplicationConfig *persistence.DomainReplicationConfig - expiry int64 + expiry time.Time sync.RWMutex } ) @@ -109,25 +109,15 @@ func (c *domainCache) GetDomainID(name string) (string, error) { // GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata // store and writes it to the cache with an expiry before returning back func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCacheEntry, error) { - now := c.timeSource.Now().UnixNano() + now := c.timeSource.Now() var result *domainCacheEntry - copyEntry := func(input *domainCacheEntry) *domainCacheEntry { - output := newDomainCacheEntry() - output.Info = input.Info - output.Config = input.Config - output.ReplicationConfig = input.ReplicationConfig - return output - } - - isCacheExpired := func(entry *domainCacheEntry) bool { return entry.expiry == 0 || now >= entry.expiry } - entry, cacheHit := cache.Get(key).(*domainCacheEntry) if cacheHit { // Found the information in the cache, lets check if it needs to be refreshed before returning back entry.RLock() - if !isCacheExpired(entry) { - result = copyEntry(entry) + if !entry.isExpired(now) { + result = entry.duplicate() entry.RUnlock() return result, nil } @@ -146,7 +136,7 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache defer entry.Unlock() // Check again under the lock to make sure someone else did not update the entry - if isCacheExpired(entry) { + if entry.isExpired(now) { response, err := c.metadataMgr.GetDomain(&persistence.GetDomainRequest{ Name: name, ID: id, @@ -154,7 +144,7 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache // Failed to get domain. Return stale entry if we have one, otherwise just return error if err != nil { - if entry.expiry > 0 { + if !entry.expiry.IsZero() { return entry, nil } @@ -164,8 +154,20 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCache entry.Info = response.Info entry.Config = response.Config entry.ReplicationConfig = response.ReplicationConfig - entry.expiry = now + domainEntryRefreshInterval + entry.expiry = now.Add(domainEntryRefreshInterval) } - return copyEntry(entry), nil + return entry.duplicate(), nil +} + +func (entry *domainCacheEntry) duplicate() *domainCacheEntry { + result := newDomainCacheEntry() + result.Info = entry.Info + result.Config = entry.Config + result.ReplicationConfig = entry.ReplicationConfig + return result +} + +func (entry *domainCacheEntry) isExpired(now time.Time) bool { + return entry.expiry.IsZero() || now.After(entry.expiry) } diff --git a/common/cache/lru.go b/common/cache/lru.go index cf2caa21221..63c333762e6 100644 --- a/common/cache/lru.go +++ b/common/cache/lru.go @@ -243,19 +243,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool) elt := c.byKey[key] if elt != nil { entry := elt.Value.(*entryImpl) - existing := entry.value - if allowUpdate { - entry.value = value - if c.ttl != 0 { - entry.createTime = time.Now() + if c.isEntryExpired(entry, time.Now()) { + // Entry has expired + c.deleteInternal(elt) + } else { + existing := entry.value + if allowUpdate { + entry.value = value + if c.ttl != 0 { + entry.createTime = time.Now() + } } - } - c.byAccess.MoveToFront(elt) - if c.pin { - entry.refCount++ + c.byAccess.MoveToFront(elt) + if c.pin { + entry.refCount++ + } + return existing, nil } - return existing, nil } entry := &entryImpl{ diff --git a/common/cluster/metadata.go b/common/cluster/metadata.go new file mode 100644 index 00000000000..f4c2a7c2efd --- /dev/null +++ b/common/cluster/metadata.go @@ -0,0 +1,95 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package cluster + +type ( + // Metadata provides information about clusters + Metadata interface { + GetNextFailoverVersion(int64) int64 + // GetCurrentClusterName return the current cluster name + GetCurrentClusterName() string + // GetAllClusterNames return the all cluster names, as a set + GetAllClusterNames() map[string]bool + } + + MetadataImpl struct { + // initialFailoverVersion is the initial failover version + initialFailoverVersion int64 + // failoverVersionIncrement is the increment of each cluster failover version + failoverVersionIncrement int64 + // currentClusterName is the name of the current cluster + currentClusterName string + // clusterNames contains all cluster names, as a set + clusterNames map[string]bool + } +) + +// NewMetadata create a new instance of Metadata +func NewMetadata(initialFailoverVersion int64, failoverVersionIncrement int64, + currentClusterName string, clusterNames []string) Metadata { + + if initialFailoverVersion < 0 { + panic("Bad initial failover version") + } else if failoverVersionIncrement <= initialFailoverVersion { + panic("Bad failover version increment") + } else if len(currentClusterName) == 0 { + panic("Current cluster name is empty") + } else if len(clusterNames) == 0 { + panic("Total number of all cluster names is 0") + } + + clusters := make(map[string]bool) + for _, clusterName := range clusterNames { + if len(clusterName) == 0 { + panic("Cluster name in all cluster names is empty") + } + clusters[clusterName] = true + } + if _, ok := clusters[currentClusterName]; !ok { + panic("Current cluster is not specified in all cluster names") + } + + return &MetadataImpl{ + initialFailoverVersion: initialFailoverVersion, + failoverVersionIncrement: failoverVersionIncrement, + currentClusterName: currentClusterName, + clusterNames: clusters, + } +} + +// GetNextFailoverVersion return the next failover version based on input +func (metadata *MetadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 { + failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement + metadata.initialFailoverVersion + if failoverVersion < currentFailoverVersion { + return failoverVersion + metadata.failoverVersionIncrement + } + return failoverVersion +} + +// GetCurrentClusterName return the current cluster name +func (metadata *MetadataImpl) GetCurrentClusterName() string { + return metadata.currentClusterName +} + +// GetAllClusterNames return the all cluster names +func (metadata *MetadataImpl) GetAllClusterNames() map[string]bool { + return metadata.clusterNames +} diff --git a/common/persistence/cassandraMetadataPersistence.go b/common/persistence/cassandraMetadataPersistence.go index 565d7a2e1a5..ad39148eb56 100644 --- a/common/persistence/cassandraMetadataPersistence.go +++ b/common/persistence/cassandraMetadataPersistence.go @@ -87,15 +87,15 @@ const ( type ( cassandraMetadataPersistence struct { - session *gocql.Session - clusterMetadata ClusterMetadata - logger bark.Logger + session *gocql.Session + currentClusterName string + logger bark.Logger } ) // NewCassandraMetadataPersistence is used to create an instance of HistoryManager implementation func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc string, keyspace string, - clusterMetadata ClusterMetadata, logger bark.Logger) (MetadataManager, + currentClusterName string, logger bark.Logger) (MetadataManager, error) { cluster := common.NewCassandraCluster(hosts, port, user, password, dc) cluster.Keyspace = keyspace @@ -110,9 +110,9 @@ func NewCassandraMetadataPersistence(hosts string, port int, user, password, dc } return &cassandraMetadataPersistence{ - session: session, - clusterMetadata: clusterMetadata, - logger: logger, + session: session, + currentClusterName: currentClusterName, + logger: logger, }, nil } @@ -248,13 +248,13 @@ func (m *cassandraMetadataPersistence) GetDomain(request *GetDomainRequest) (*Ge return nil, handleError(request.Name, request.ID, err) } - replicationConfig.ActiveClusterName = m.clusterMetadata.GetOrUseDefaultActiveCluster(replicationConfig.ActiveClusterName) + replicationConfig.ActiveClusterName = GetOrUseDefaultActiveCluster(m.currentClusterName, replicationConfig.ActiveClusterName) for index := range replicationClusters { clusterReplicationConfig := &ClusterReplicationConfig{} clusterReplicationConfig.deserialize(replicationClusters[index]) replicationConfig.Clusters = append(replicationConfig.Clusters, clusterReplicationConfig) } - replicationConfig.Clusters = m.clusterMetadata.GetOrUseDefaultClusters(replicationConfig.Clusters) + replicationConfig.Clusters = GetOrUseDefaultClusters(m.currentClusterName, replicationConfig.Clusters) return &GetDomainResponse{ Info: info, diff --git a/common/persistence/clusterMetadata.go b/common/persistence/clusterMetadata.go index f94c2ef585b..e5c14cc9479 100644 --- a/common/persistence/clusterMetadata.go +++ b/common/persistence/clusterMetadata.go @@ -20,99 +20,20 @@ package persistence -type ( - // ClusterMetadata provides information about clusters - ClusterMetadata interface { - GetNextFailoverVersion(int64) int64 - // GetCurrentClusterName return the current cluster name - GetCurrentClusterName() string - // GetAllClusterNames return the all cluster names, as a set - GetAllClusterNames() map[string]bool - // GetOrUseDefaultActiveCluster return the current cluster name or use the input if valid - GetOrUseDefaultActiveCluster(string) string - // GetOrUseDefaultClusters return the current cluster or use the input if valid - GetOrUseDefaultClusters([]*ClusterReplicationConfig) []*ClusterReplicationConfig - } - - clusterMetadataImpl struct { - // initialFailoverVersion is the initial failover version - initialFailoverVersion int64 - // failoverVersionIncrement is the increment of each cluster failover version - failoverVersionIncrement int64 - // currentClusterName is the name of the current cluster - currentClusterName string - // clusterNames contains all cluster names, as a set - clusterNames map[string]bool - } -) - -// NewClusterMetadata create a new instance of Metadata -func NewClusterMetadata(initialFailoverVersion int64, failoverVersionIncrement int64, - currentClusterName string, clusterNames []string) ClusterMetadata { - - if initialFailoverVersion < 0 { - panic("Bad initial failover version") - } else if failoverVersionIncrement <= initialFailoverVersion { - panic("Bad failover version increment") - } else if len(currentClusterName) == 0 { - panic("Current cluster name is empty") - } else if len(clusterNames) == 0 { - panic("Total number of all cluster names is 0") - } - - clusters := make(map[string]bool) - for _, clusterName := range clusterNames { - if len(clusterName) == 0 { - panic("Cluster name in all cluster names is empty") - } - clusters[clusterName] = true - } - if _, ok := clusters[currentClusterName]; !ok { - panic("Current cluster is not specified in all cluster names") - } - - return &clusterMetadataImpl{ - initialFailoverVersion: initialFailoverVersion, - failoverVersionIncrement: failoverVersionIncrement, - currentClusterName: currentClusterName, - clusterNames: clusters, - } -} - -// GetNextFailoverVersion return the next failover version based on input -func (metadata *clusterMetadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 { - failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement + metadata.initialFailoverVersion - if failoverVersion < currentFailoverVersion { - return failoverVersion + metadata.failoverVersionIncrement - } - return failoverVersion -} - -// GetCurrentClusterName return the current cluster name -func (metadata *clusterMetadataImpl) GetCurrentClusterName() string { - return metadata.currentClusterName -} - -// GetAllClusterNames return the all cluster names -func (metadata *clusterMetadataImpl) GetAllClusterNames() map[string]bool { - return metadata.clusterNames -} - // GetOrUseDefaultActiveCluster return the current cluster name or use the input if valid -func (metadata *clusterMetadataImpl) GetOrUseDefaultActiveCluster(activeClusterName string) string { +func GetOrUseDefaultActiveCluster(currentClusterName string, activeClusterName string) string { if len(activeClusterName) == 0 { - return metadata.GetCurrentClusterName() + return currentClusterName } return activeClusterName } // GetOrUseDefaultClusters return the current cluster or use the input if valid -func (metadata *clusterMetadataImpl) GetOrUseDefaultClusters( - clusters []*ClusterReplicationConfig) []*ClusterReplicationConfig { +func GetOrUseDefaultClusters(currentClusterName string, clusters []*ClusterReplicationConfig) []*ClusterReplicationConfig { if len(clusters) == 0 { return []*ClusterReplicationConfig{ &ClusterReplicationConfig{ - ClusterName: metadata.GetCurrentClusterName(), + ClusterName: currentClusterName, }, } } diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index f5eba00bdd1..603313cabaf 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -34,6 +34,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/logging" ) @@ -85,7 +86,7 @@ type ( VisibilityMgr VisibilityManager ShardInfo *ShardInfo TaskIDGenerator TransferTaskIDGenerator - ClusterMetadata ClusterMetadata + ClusterMetadata cluster.Metadata readLevel int64 CassandraTestCluster } @@ -115,7 +116,7 @@ func (g *testTransferTaskIDGenerator) GetNextTransferTaskID() (int64, error) { // SetupWorkflowStoreWithOptions to setup workflow test base func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) { log := bark.NewLoggerFromLogrus(log.New()) - s.ClusterMetadata = NewClusterMetadata( + s.ClusterMetadata = cluster.NewMetadata( testInitialFailoverVersion, testFailoverVersionIncrement, testCurrentClusterName, @@ -154,7 +155,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) { } s.MetadataManager, err = NewCassandraMetadataPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser, - options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, s.ClusterMetadata, log) + options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, s.ClusterMetadata.GetCurrentClusterName(), log) if err != nil { log.Fatal(err) } diff --git a/common/service/service.go b/common/service/service.go index 1c6d4d87e6d..f73b43cd1f5 100644 --- a/common/service/service.go +++ b/common/service/service.go @@ -27,10 +27,10 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" - "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service/config" "github.com/uber-common/bark" @@ -52,7 +52,7 @@ type ( RPCFactory common.RPCFactory PProfInitializer common.PProfInitializer CassandraConfig config.Cassandra - ClusterMetadata persistence.ClusterMetadata + ClusterMetadata cluster.Metadata } // RingpopFactory provides a bootstrapped ringpop @@ -78,7 +78,7 @@ type ( metricsScope tally.Scope runtimeMetricsReporter *metrics.RuntimeMetricsReporter metricsClient metrics.Client - clusterMetadata persistence.ClusterMetadata + clusterMetadata cluster.Metadata } ) @@ -211,7 +211,7 @@ func (h *serviceImpl) GetDispatcher() *yarpc.Dispatcher { } // GetClusterMetadata returns the service cluster metadata -func (h *serviceImpl) GetClusterMetadata() persistence.ClusterMetadata { +func (h *serviceImpl) GetClusterMetadata() cluster.Metadata { return h.clusterMetadata } diff --git a/common/service/serviceinterfaces.go b/common/service/serviceinterfaces.go index bf0e6516fe8..237f8a1b9ce 100644 --- a/common/service/serviceinterfaces.go +++ b/common/service/serviceinterfaces.go @@ -23,9 +23,9 @@ package service import ( "github.com/uber-common/bark" "github.com/uber/cadence/client" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" - "github.com/uber/cadence/common/persistence" "go.uber.org/yarpc" ) @@ -54,6 +54,6 @@ type ( GetHostInfo() *membership.HostInfo // GetClusterMetadata returns the service cluster metadata - GetClusterMetadata() persistence.ClusterMetadata + GetClusterMetadata() cluster.Metadata } ) diff --git a/host/integration_test.go b/host/integration_test.go index 3958ccbd979..b0ec9cb765d 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -169,7 +169,7 @@ func (s *integrationSuite) TearDownTest() { func (s *integrationSuite) TestIntegrationRegisterGetDomain_AllDefault() { domainName := "some random domain name" clusters := []*workflow.ClusterReplicationConfiguration{} - for _, replicationConfig := range s.ClusterMetadata.GetOrUseDefaultClusters(nil) { + for _, replicationConfig := range persistence.GetOrUseDefaultClusters(s.ClusterMetadata.GetCurrentClusterName(), nil) { clusters = append(clusters, &workflow.ClusterReplicationConfiguration{ ClusterName: common.StringPtr(replicationConfig.ClusterName), }) diff --git a/host/onebox.go b/host/onebox.go index 912cd9561ca..c04ed77705c 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" fecli "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/service/config" @@ -67,7 +68,7 @@ type ( numberOfHistoryShards int numberOfHistoryHosts int logger bark.Logger - clusterMetadata persistence.ClusterMetadata + clusterMetadata cluster.Metadata metadataMgr persistence.MetadataManager shardMgr persistence.ShardManager historyMgr persistence.HistoryManager @@ -86,7 +87,7 @@ type ( ) // NewCadence returns an instance that hosts full cadence in one process -func NewCadence(clusterMetadata persistence.ClusterMetadata, metadataMgr persistence.MetadataManager, shardMgr persistence.ShardManager, +func NewCadence(clusterMetadata cluster.Metadata, metadataMgr persistence.MetadataManager, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, executionMgrFactory persistence.ExecutionManagerFactory, taskMgr persistence.TaskManager, visibilityMgr persistence.VisibilityManager, numberOfHistoryShards, numberOfHistoryHosts int, logger bark.Logger) Cadence { diff --git a/service/frontend/handler.go b/service/frontend/handler.go index ae145e01ac4..d131d2072c2 100644 --- a/service/frontend/handler.go +++ b/service/frontend/handler.go @@ -174,7 +174,7 @@ func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest * } clusters = append(clusters, &persistence.ClusterReplicationConfig{ClusterName: clusterName}) } - clusters = clusterMetadata.GetOrUseDefaultClusters(clusters) + clusters = persistence.GetOrUseDefaultClusters(activeClusterName, clusters) // validate active cluster is also specified in all clusters activeClusterInClusters := false diff --git a/service/frontend/service.go b/service/frontend/service.go index e3cfda4db59..05c1f256ec9 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -78,7 +78,7 @@ func (s *Service) Start() { p.CassandraConfig.Password, p.CassandraConfig.Datacenter, p.CassandraConfig.Keyspace, - p.ClusterMetadata, + p.ClusterMetadata.GetCurrentClusterName(), p.Logger) if err != nil { diff --git a/service/history/service.go b/service/history/service.go index 7e6c3f3384c..eb4eb643d7f 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -173,7 +173,7 @@ func (s *Service) Start() { p.CassandraConfig.Password, p.CassandraConfig.Datacenter, p.CassandraConfig.Keyspace, - p.ClusterMetadata, + p.ClusterMetadata.GetCurrentClusterName(), p.Logger) if err != nil {