Skip to content

Commit

Permalink
Refactor persistence factory implementation (#2544)
Browse files Browse the repository at this point in the history
- Remove FaultInjection from persistence factory interface
- Add provider for data store factory and fault injection data store factory
- Use one data store factory for all store types
  • Loading branch information
yycptt authored Feb 28, 2022
1 parent 9778fa3 commit 3b07030
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 259 deletions.
7 changes: 3 additions & 4 deletions common/persistence/client/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@ type (

// BeanImpl stores persistence managers
BeanImpl struct {
sync.RWMutex

clusterMetadataManager persistence.ClusterMetadataManager
metadataManager persistence.MetadataManager
taskManager persistence.TaskManager
namespaceReplicationQueue persistence.NamespaceReplicationQueue
shardManager persistence.ShardManager
executionManager persistence.ExecutionManager

factory Factory
faultInjection *FaultInjectionDataStoreFactory

sync.RWMutex
factory Factory
isClosed bool
}
)
Expand Down
233 changes: 41 additions & 192 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,12 @@
package client

import (
"sync"

"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/cassandra"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resolver"
)

type (
Expand All @@ -58,77 +52,19 @@ type (
NewNamespaceReplicationQueue() (p.NamespaceReplicationQueue, error)
// NewClusterMetadataManager returns a new manager for cluster specific metadata
NewClusterMetadataManager() (p.ClusterMetadataManager, error)
FaultInjection() *FaultInjectionDataStoreFactory
}
// DataStoreFactory is a low level interface to be implemented by a datastore
// Examples of datastores are cassandra, mysql etc
DataStoreFactory interface {
// Close closes the factory
Close()
// NewTaskStore returns a new task store
NewTaskStore() (p.TaskStore, error)
// NewShardStore returns a new shard store
NewShardStore() (p.ShardStore, error)
// NewMetadataStore returns a new metadata store
NewMetadataStore() (p.MetadataStore, error)
// NewExecutionStore returns a new execution store
NewExecutionStore() (p.ExecutionStore, error)
NewQueue(queueType p.QueueType) (p.Queue, error)
// NewClusterMetadataStore returns a new metadata store
NewClusterMetadataStore() (p.ClusterMetadataStore, error)
}
// AbstractDataStoreFactory creates a DataStoreFactory, can be used to implement custom datastore support outside
// of the Temporal core.
AbstractDataStoreFactory interface {
NewFactory(
cfg config.CustomDatastoreConfig,
r resolver.ServiceResolver,
clusterName string,
logger log.Logger,
metricsClient metrics.Client,
) DataStoreFactory
}

// Datastore represents a datastore
Datastore struct {
factory DataStoreFactory
ratelimit quotas.RateLimiter
}
factoryImpl struct {
sync.RWMutex
config *config.Persistence
serializer serialization.Serializer
abstractDataStoreFactory AbstractDataStoreFactory
faultInjection *FaultInjectionDataStoreFactory
metricsClient metrics.Client
logger log.Logger
datastores map[storeType]Datastore
clusterName string
dataStoreFactory DataStoreFactory
config *config.Persistence
serializer serialization.Serializer
metricsClient metrics.Client
logger log.Logger
clusterName string
ratelimiter quotas.RateLimiter
}

storeType int
)

const (
storeTypeHistory storeType = iota + 1
storeTypeTask
storeTypeShard
storeTypeMetadata
storeTypeExecution
storeTypeQueue
storeTypeClusterMetadata
)

var storeTypes = []storeType{
storeTypeHistory,
storeTypeTask,
storeTypeShard,
storeTypeMetadata,
storeTypeExecution,
storeTypeQueue,
storeTypeClusterMetadata,
}

// NewFactory returns an implementation of factory that vends persistence objects based on
// specified configuration. This factory takes as input a config.Persistence object
// which specifies the datastore to be used for a given type of object. This config
Expand All @@ -137,52 +73,35 @@ var storeTypes = []storeType{
// The objects returned by this factory enforce ratelimit and maxconns according to
// given configuration. In addition, all objects will emit metrics automatically
func NewFactory(
dataStoreFactory DataStoreFactory,
cfg *config.Persistence,
r resolver.ServiceResolver,
persistenceMaxQPS dynamicconfig.IntPropertyFn,
ratelimiter quotas.RateLimiter,
serializer serialization.Serializer,
abstractDataStoreFactory AbstractDataStoreFactory,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
) Factory {
return NewFactoryImpl(cfg, r, persistenceMaxQPS, serializer, abstractDataStoreFactory, clusterName, metricsClient, logger)
}

// Initializes and returns FactoryImpl
func NewFactoryImpl(
cfg *config.Persistence,
r resolver.ServiceResolver,
persistenceMaxQPS dynamicconfig.IntPropertyFn,
serializer serialization.Serializer,
abstractDataStoreFactory AbstractDataStoreFactory,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
) *factoryImpl {
factory := &factoryImpl{
config: cfg,
serializer: serializer,
abstractDataStoreFactory: abstractDataStoreFactory,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
return &factoryImpl{
dataStoreFactory: dataStoreFactory,
config: cfg,
serializer: serializer,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
ratelimiter: ratelimiter,
}
limiters := buildRateLimiters(cfg, persistenceMaxQPS)
factory.init(clusterName, limiters, r)
return factory
}

// NewTaskManager returns a new task manager
func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
ds := f.datastores[storeTypeTask]
taskStore, err := ds.factory.NewTaskStore()
taskStore, err := f.dataStoreFactory.NewTaskStore()
if err != nil {
return nil, err
}

result := p.NewTaskManager(taskStore, f.serializer)
if ds.ratelimit != nil {
result = p.NewTaskPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
if f.ratelimiter != nil {
result = p.NewTaskPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewTaskPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -192,14 +111,14 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {

// NewShardManager returns a new shard manager
func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
ds := f.datastores[storeTypeShard]
shardStore, err := ds.factory.NewShardStore()
result := p.NewShardManager(shardStore, f.serializer)
shardStore, err := f.dataStoreFactory.NewShardStore()
if err != nil {
return nil, err
}
if ds.ratelimit != nil {
result = p.NewShardPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)

result := p.NewShardManager(shardStore, f.serializer)
if f.ratelimiter != nil {
result = p.NewShardPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewShardPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -209,17 +128,14 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {

// NewMetadataManager returns a new metadata manager
func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {
var err error
var store p.MetadataStore
ds := f.datastores[storeTypeMetadata]
store, err = ds.factory.NewMetadataStore()
store, err := f.dataStoreFactory.NewMetadataStore()
if err != nil {
return nil, err
}

result := p.NewMetadataManagerImpl(store, f.serializer, f.logger, f.clusterName)
if ds.ratelimit != nil {
result = p.NewMetadataPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
if f.ratelimiter != nil {
result = p.NewMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewMetadataPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -229,17 +145,14 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {

// NewClusterMetadataManager returns a new cluster metadata manager
func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, error) {
var err error
var store p.ClusterMetadataStore
ds := f.datastores[storeTypeClusterMetadata]
store, err = ds.factory.NewClusterMetadataStore()
store, err := f.dataStoreFactory.NewClusterMetadataStore()
if err != nil {
return nil, err
}

result := p.NewClusterMetadataManagerImpl(store, f.serializer, f.clusterName, f.logger)
if ds.ratelimit != nil {
result = p.NewClusterMetadataPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
if f.ratelimiter != nil {
result = p.NewClusterMetadataPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewClusterMetadataPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -249,15 +162,14 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err

// NewExecutionManager returns a new execution manager
func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {

ds := f.datastores[storeTypeExecution]
store, err := ds.factory.NewExecutionStore()
store, err := f.dataStoreFactory.NewExecutionStore()
if err != nil {
return nil, err
}

result := p.NewExecutionManager(store, f.serializer, f.logger, f.config.TransactionSizeLimit)
if ds.ratelimit != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
if f.ratelimiter != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewExecutionPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -266,13 +178,13 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
}

func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueue, error) {
ds := f.datastores[storeTypeQueue]
result, err := ds.factory.NewQueue(p.NamespaceReplicationQueueType)
result, err := f.dataStoreFactory.NewQueue(p.NamespaceReplicationQueueType)
if err != nil {
return nil, err
}
if ds.ratelimit != nil {
result = p.NewQueuePersistenceRateLimitedClient(result, ds.ratelimit, f.logger)

if f.ratelimiter != nil {
result = p.NewQueuePersistenceRateLimitedClient(result, f.ratelimiter, f.logger)
}
if f.metricsClient != nil {
result = p.NewQueuePersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand All @@ -283,68 +195,5 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu

// Close closes this factory
func (f *factoryImpl) Close() {
for _, ds := range f.datastores {
ds.factory.Close()
}
}

func (f *factoryImpl) isCassandra() bool {
cfg := f.config
return cfg.DataStores[cfg.VisibilityStore].SQL == nil
}

func (f *factoryImpl) getCassandraConfig() *config.Cassandra {
cfg := f.config
return cfg.DataStores[cfg.VisibilityStore].Cassandra
}

func (f *factoryImpl) init(
clusterName string,
limiters map[string]quotas.RateLimiter,
r resolver.ServiceResolver,
) {

f.datastores = make(map[storeType]Datastore, len(storeTypes))

defaultCfg := f.config.DataStores[f.config.DefaultStore]
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.Cassandra != nil:
defaultDataStore.factory = cassandra.NewFactory(*defaultCfg.Cassandra, r, clusterName, f.logger)
case defaultCfg.SQL != nil:
defaultDataStore.factory = sql.NewFactory(*defaultCfg.SQL, r, clusterName, f.logger)
case defaultCfg.CustomDataStoreConfig != nil:
defaultDataStore.factory = f.abstractDataStoreFactory.NewFactory(*defaultCfg.CustomDataStoreConfig, r, clusterName, f.logger, f.metricsClient)
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified for default data store")
}

if defaultCfg.FaultInjection != nil {
f.faultInjection = NewFaultInjectionDatastoreFactory(defaultCfg.FaultInjection, defaultDataStore.factory)
defaultDataStore.factory = f.faultInjection
}

for _, sType := range storeTypes {
f.datastores[sType] = defaultDataStore
}
}

func buildRateLimiters(
cfg *config.Persistence,
maxQPS dynamicconfig.IntPropertyFn,
) map[string]quotas.RateLimiter {

result := make(map[string]quotas.RateLimiter, len(cfg.DataStores))
for dsName := range cfg.DataStores {
if maxQPS != nil && maxQPS() > 0 {
result[dsName] = quotas.NewDefaultOutgoingRateLimiter(
func() float64 { return float64(maxQPS()) },
)
}
}
return result
}

func (f *factoryImpl) FaultInjection() *FaultInjectionDataStoreFactory {
return f.faultInjection
f.dataStoreFactory.Close()
}
Loading

0 comments on commit 3b07030

Please sign in to comment.