Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor existing domain API for cross DC, refactor existing domain p… #527

Merged
merged 6 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

538 changes: 518 additions & 20 deletions .gen/go/shared/types.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/service/frontend"
Expand Down Expand Up @@ -100,10 +101,15 @@ func (s *server) startService() common.Daemon {
}

svcCfg := s.cfg.Services[s.name]

params.MetricScope = svcCfg.Metrics.NewScope()
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
params.ClusterMetadata = cluster.NewMetadata(
s.cfg.ClustersInfo.InitialFailoverVersion,
s.cfg.ClustersInfo.FailoverVersionIncrement,
s.cfg.ClustersInfo.CurrentClusterName,
s.cfg.ClustersInfo.ClusterNames,
)

var daemon common.Daemon

Expand Down
74 changes: 41 additions & 33 deletions common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
domainCacheInitialSize = 1024
domainCacheMaxSize = 16 * 1024
domainCacheTTL = time.Hour
domainEntryRefreshInterval = int64(10 * time.Second)
domainEntryRefreshInterval = 10 * time.Second
)

type (
Expand All @@ -44,8 +44,8 @@ type (
// in updating the domain entry every 10 seconds but in the case of a cassandra failure we can still keep on serving
// requests using the stale entry from cache upto an hour
DomainCache interface {
GetDomain(name string) (*persistence.DomainInfo, *persistence.DomainConfig, error)
GetDomainByID(id string) (*persistence.DomainInfo, *persistence.DomainConfig, error)
GetDomain(name string) (*domainCacheEntry, error)
GetDomainByID(id string) (*domainCacheEntry, error)
GetDomainID(name string) (string, error)
}

Expand All @@ -58,9 +58,10 @@ type (
}

domainCacheEntry struct {
info *persistence.DomainInfo
config *persistence.DomainConfig
expiry int64
Info *persistence.DomainInfo
Config *persistence.DomainConfig
ReplicationConfig *persistence.DomainReplicationConfig
expiry time.Time
sync.RWMutex
}
)
Expand All @@ -86,50 +87,44 @@ func newDomainCacheEntry() *domainCacheEntry {

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomain(name string) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
func (c *domainCache) GetDomain(name string) (*domainCacheEntry, error) {
return c.getDomain(name, "", name, c.cacheByName)
}

// GetDomainByID retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) GetDomainByID(id string) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
func (c *domainCache) GetDomainByID(id string) (*domainCacheEntry, error) {
return c.getDomain(id, id, "", c.cacheByID)
}

// GetDomainID retrieves domainID by using GetDomain
func (c *domainCache) GetDomainID(name string) (string, error) {
info, _, err := c.GetDomain(name)
entry, err := c.GetDomain(name)
if err != nil {
return "", err
}
return info.ID, nil
return entry.Info.ID, nil
}

// GetDomain retrieves the information from the cache if it exists, otherwise retrieves the information from metadata
// store and writes it to the cache with an expiry before returning back
func (c *domainCache) getDomain(key, id, name string, cache Cache) (*persistence.DomainInfo, *persistence.DomainConfig, error) {
now := c.timeSource.Now().UnixNano()
refreshCache := false
var info *persistence.DomainInfo
var config *persistence.DomainConfig
func (c *domainCache) getDomain(key, id, name string, cache Cache) (*domainCacheEntry, error) {
now := c.timeSource.Now()
var result *domainCacheEntry

entry, cacheHit := cache.Get(key).(*domainCacheEntry)
if cacheHit {
// Found the information in the cache, lets check if it needs to be refreshed before returning back
entry.RLock()
info = entry.info
config = entry.config

if entry.expiry == 0 || now >= entry.expiry {
refreshCache = true
if !entry.isExpired(now) {
result = entry.duplicate()
entry.RUnlock()
return result, nil
}
// cache expired, need to refresh
entry.RUnlock()
}

// Found a cache entry and no need to refresh. Return immediately
if cacheHit && !refreshCache {
return info, config, nil
}

// Cache entry not found, Let's create an entry and add it to cache
if !cacheHit {
elem, _ := cache.PutIfNotExist(key, newDomainCacheEntry())
Expand All @@ -141,25 +136,38 @@ func (c *domainCache) getDomain(key, id, name string, cache Cache) (*persistence
defer entry.Unlock()

// Check again under the lock to make sure someone else did not update the entry
if entry.expiry == 0 || now >= entry.expiry {
if entry.isExpired(now) {
response, err := c.metadataMgr.GetDomain(&persistence.GetDomainRequest{
Name: name,
ID: id,
})

// Failed to get domain. Return stale entry if we have one, otherwise just return error
if err != nil {
if entry.expiry > 0 {
return entry.info, entry.config, nil
if !entry.expiry.IsZero() {
return entry, nil
}

return nil, nil, err
return nil, err
}

entry.info = response.Info
entry.config = response.Config
entry.expiry = now + domainEntryRefreshInterval
entry.Info = response.Info
entry.Config = response.Config
entry.ReplicationConfig = response.ReplicationConfig
entry.expiry = now.Add(domainEntryRefreshInterval)
}

return entry.info, entry.config, nil
return entry.duplicate(), nil
}

func (entry *domainCacheEntry) duplicate() *domainCacheEntry {
result := newDomainCacheEntry()
result.Info = entry.Info
result.Config = entry.Config
result.ReplicationConfig = entry.ReplicationConfig
return result
}

func (entry *domainCacheEntry) isExpired(now time.Time) bool {
return entry.expiry.IsZero() || now.After(entry.expiry)
}
25 changes: 15 additions & 10 deletions common/cache/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,24 @@ func (c *lru) putInternal(key interface{}, value interface{}, allowUpdate bool)
elt := c.byKey[key]
if elt != nil {
entry := elt.Value.(*entryImpl)
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
if c.isEntryExpired(entry, time.Now()) {
// Entry has expired
c.deleteInternal(elt)
} else {
existing := entry.value
if allowUpdate {
entry.value = value
if c.ttl != 0 {
entry.createTime = time.Now()
}
}
}

c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
c.byAccess.MoveToFront(elt)
if c.pin {
entry.refCount++
}
return existing, nil
}
return existing, nil
}

entry := &entryImpl{
Expand Down
95 changes: 95 additions & 0 deletions common/cluster/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cluster

type (
// Metadata provides information about clusters
Metadata interface {
GetNextFailoverVersion(int64) int64
// GetCurrentClusterName return the current cluster name
GetCurrentClusterName() string
// GetAllClusterNames return the all cluster names, as a set
GetAllClusterNames() map[string]bool
}

MetadataImpl struct {
// initialFailoverVersion is the initial failover version
initialFailoverVersion int64
// failoverVersionIncrement is the increment of each cluster failover version
failoverVersionIncrement int64
// currentClusterName is the name of the current cluster
currentClusterName string
// clusterNames contains all cluster names, as a set
clusterNames map[string]bool
}
)

// NewMetadata create a new instance of Metadata
func NewMetadata(initialFailoverVersion int64, failoverVersionIncrement int64,
currentClusterName string, clusterNames []string) Metadata {

if initialFailoverVersion < 0 {
panic("Bad initial failover version")
} else if failoverVersionIncrement <= initialFailoverVersion {
panic("Bad failover version increment")
} else if len(currentClusterName) == 0 {
panic("Current cluster name is empty")
} else if len(clusterNames) == 0 {
panic("Total number of all cluster names is 0")
}

clusters := make(map[string]bool)
for _, clusterName := range clusterNames {
if len(clusterName) == 0 {
panic("Cluster name in all cluster names is empty")
}
clusters[clusterName] = true
}
if _, ok := clusters[currentClusterName]; !ok {
panic("Current cluster is not specified in all cluster names")
}

return &MetadataImpl{
initialFailoverVersion: initialFailoverVersion,
failoverVersionIncrement: failoverVersionIncrement,
currentClusterName: currentClusterName,
clusterNames: clusters,
}
}

// GetNextFailoverVersion return the next failover version based on input
func (metadata *MetadataImpl) GetNextFailoverVersion(currentFailoverVersion int64) int64 {
failoverVersion := currentFailoverVersion/metadata.failoverVersionIncrement + metadata.initialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + metadata.failoverVersionIncrement
}
return failoverVersion
}

// GetCurrentClusterName return the current cluster name
func (metadata *MetadataImpl) GetCurrentClusterName() string {
return metadata.currentClusterName
}

// GetAllClusterNames return the all cluster names
func (metadata *MetadataImpl) GetAllClusterNames() map[string]bool {
return metadata.clusterNames
}
Loading