Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Few performance optimization. #960

Merged
merged 2 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var keys = map[Key]string{
HistoryMgrNumConns: "history.historyMgrNumConns",
MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch",
ShardUpdateMinInterval: "history.shardUpdateMinInterval",
ShardSyncMinInterval: "history.shardSyncMinInterval",

// worker settings
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
Expand Down Expand Up @@ -278,6 +279,8 @@ const (
MaximumBufferedEventsBatch
// ShardUpdateMinInterval is the minimal time interval which the shard info can be updated
ShardUpdateMinInterval
// ShardSyncMinInterval is the minimal time interval which the shard info should be sync to remote
ShardSyncMinInterval

// key for histoworkerry

Expand Down
2 changes: 1 addition & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h *Handler) Start() error {
h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger())
h.domainCache.Start()
h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr,
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient(), h.publisher)
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient())
h.metricsClient = h.GetMetricsClient()
h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID)
// events notifier must starts before controller
Expand Down
38 changes: 22 additions & 16 deletions service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
metricsClient metrics.Client
rateLimiter common.TokenBucket // Read rate limiter
ackMgr queueAckMgr
retryPolicy backoff.RetryPolicy

// worker coroutines notification
workerNotificationChans []chan struct{}
Expand Down Expand Up @@ -95,6 +96,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p
metricsClient: shard.GetMetricsClient(),
logger: logger,
ackMgr: queueAckMgr,
retryPolicy: common.CreatePersistanceRetryPolicy(),
lastPollTime: time.Time{},
}

Expand Down Expand Up @@ -248,8 +250,20 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{},
var logger bark.Logger
var err error
startTime := time.Now()

retryCount := 0
op := func() error {
err = p.processor.process(task)
if err != nil && err != ErrTaskRetry {
retryCount++
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)
}
return err
}

ProcessRetryLoop:
for retryCount := 1; retryCount <= p.options.MaxRetryCount(); {
for retryCount < p.options.MaxRetryCount() {
select {
case <-p.shutdownCh:
return
Expand All @@ -260,25 +274,17 @@ ProcessRetryLoop:
default:
}

err = p.processor.process(task)
err = backoff.Retry(op, p.retryPolicy, func(err error) bool {
return err != ErrTaskRetry
})

if err != nil {
if err == ErrTaskRetry {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter)
<-notificationChan
} else {
logger = p.initializeLoggerForTask(task, logger)
logging.LogTaskProcessingFailedEvent(logger, err)

// it is possible that DomainNotActiveError is thrown
// just keep try for cache.DomainCacheRefreshInterval
// and giveup
if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
backoff := time.Duration(retryCount * 100)
time.Sleep(backoff * time.Millisecond)
retryCount++
} else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval {
p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter)
return
}
continue ProcessRetryLoop
}
Expand Down
75 changes: 57 additions & 18 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package history

import (
"errors"
"sync"
"time"

"github.com/uber-common/bark"
h "github.com/uber/cadence/.gen/go/history"
Expand All @@ -36,16 +38,20 @@ import (

type (
replicatorQueueProcessorImpl struct {
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
currentClusterNamer string
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
*queueProcessorBase
queueAckMgr

sync.Mutex
lastShardSyncTimestamp time.Time
}
)

Expand Down Expand Up @@ -76,14 +82,15 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc
})

processor := &replicatorQueueProcessorImpl{
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
currentClusterNamer: shard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
}

queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetReplicatorAckLevel(), logger)
Expand Down Expand Up @@ -166,7 +173,13 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi
},
}

return p.replicator.Publish(replicationTask)
err = p.replicator.Publish(replicationTask)
if err == nil {
p.Lock()
p.lastShardSyncTimestamp = common.NewRealTimeSource().Now()
p.Unlock()
}
return err
}

func (p *replicatorQueueProcessorImpl) readTasks(readLevel int64) ([]queueTaskInfo, bool, error) {
Expand Down Expand Up @@ -195,7 +208,33 @@ func (p *replicatorQueueProcessorImpl) completeTask(taskID int64) error {
}

func (p *replicatorQueueProcessorImpl) updateAckLevel(ackLevel int64) error {
return p.shard.UpdateReplicatorAckLevel(ackLevel)
err := p.shard.UpdateReplicatorAckLevel(ackLevel)

// this is a hack, since there is not dedicated ticker on the queue processor
// to periodically send out sync shard message, put it here
now := common.NewRealTimeSource().Now()
sendSyncTask := false
p.Lock()
if p.lastShardSyncTimestamp.Add(p.shard.GetConfig().ShardSyncMinInterval()).Before(now) {
p.lastShardSyncTimestamp = now
sendSyncTask = true
}
p.Unlock()

if sendSyncTask {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(p.currentClusterNamer),
ShardId: common.Int64Ptr(int64(p.shard.GetShardID())),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
p.replicator.Publish(syncStatusTask)
}

return err
}

func (p *replicatorQueueProcessorImpl) getHistory(domainID, workflowID, runID string, firstEventID,
Expand Down
3 changes: 3 additions & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type Config struct {

// ShardUpdateMinInterval the minimal time interval which the shard info can be updated
ShardUpdateMinInterval dynamicconfig.DurationPropertyFn
// ShardSyncMinInterval the minimal time interval which the shard info should be sync to remote
ShardSyncMinInterval dynamicconfig.DurationPropertyFn

// Time to hold a poll request before returning an empty response
// right now only used by GetMutableState
Expand Down Expand Up @@ -154,6 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50),
MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100),
ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute),
ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute),
// history client: client/history/client.go set the client timeout 30s
LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain(
dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20,
Expand Down
27 changes: 6 additions & 21 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/messaging"

"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -90,14 +88,15 @@ type (
config *Config
logger bark.Logger
metricsClient metrics.Client
messageProducer messaging.Producer

sync.RWMutex
lastUpdated time.Time
shardInfo *persistence.ShardInfo
transferSequenceNumber int64
maxTransferSequenceNumber int64
transferMaxReadLevel int64

// exist only in memory
standbyClusterCurrentTime map[string]time.Time
}
)
Expand Down Expand Up @@ -588,26 +587,14 @@ func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) {
}

func (s *shardContextImpl) updateShardInfoLocked() error {
now := time.Now()
var err error
now := common.NewRealTimeSource().Now()
if s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
return nil
}
updatedShardInfo := copyShardInfo(s.shardInfo)

if s.messageProducer != nil {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(s.currentCluster),
ShardId: common.Int64Ptr(int64(s.shardID)),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
s.messageProducer.Publish(syncStatusTask)
}

err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
err = s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
PreviousRangeID: s.shardInfo.RangeID,
})
Expand Down Expand Up @@ -677,8 +664,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time {
// TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter.
func acquireShard(shardID int, svc service.Service, shardManager persistence.ShardManager,
historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache,
owner string, closeCh chan<- int, config *Config, logger bark.Logger,
metricsClient metrics.Client, messageProducer messaging.Producer) (ShardContext,
owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext,
error) {
response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID})
if err0 != nil {
Expand Down Expand Up @@ -712,7 +698,6 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
shardInfo: updatedShardInfo,
closeCh: closeCh,
metricsClient: metricsClient,
messageProducer: messageProducer,
config: config,
standbyClusterCurrentTime: standbyClusterCurrentTime,
}
Expand Down
Loading