diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index ee12da2c1df..aca7e1e2b1c 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -119,6 +119,7 @@ var keys = map[Key]string{ HistoryMgrNumConns: "history.historyMgrNumConns", MaximumBufferedEventsBatch: "history.maximumBufferedEventsBatch", ShardUpdateMinInterval: "history.shardUpdateMinInterval", + ShardSyncMinInterval: "history.shardSyncMinInterval", // worker settings WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", @@ -278,6 +279,8 @@ const ( MaximumBufferedEventsBatch // ShardUpdateMinInterval is the minimal time interval which the shard info can be updated ShardUpdateMinInterval + // ShardSyncMinInterval is the minimal time interval which the shard info should be sync to remote + ShardSyncMinInterval // key for histoworkerry diff --git a/service/history/handler.go b/service/history/handler.go index e71f50b5d03..d98f9ee1cb0 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -136,7 +136,7 @@ func (h *Handler) Start() error { h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger()) h.domainCache.Start() h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr, - h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient(), h.publisher) + h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient()) h.metricsClient = h.GetMetricsClient() h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID) // events notifier must starts before controller diff --git a/service/history/queueProcessor.go b/service/history/queueProcessor.go index 293e6bdce02..fe79fafe00d 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessor.go @@ -60,6 +60,7 @@ type ( metricsClient metrics.Client rateLimiter common.TokenBucket // Read rate limiter ackMgr queueAckMgr + retryPolicy backoff.RetryPolicy // worker coroutines notification workerNotificationChans []chan struct{} @@ -95,6 +96,7 @@ func newQueueProcessorBase(shard ShardContext, options *QueueProcessorOptions, p metricsClient: shard.GetMetricsClient(), logger: logger, ackMgr: queueAckMgr, + retryPolicy: common.CreatePersistanceRetryPolicy(), lastPollTime: time.Time{}, } @@ -248,8 +250,20 @@ func (p *queueProcessorBase) processWithRetry(notificationChan <-chan struct{}, var logger bark.Logger var err error startTime := time.Now() + + retryCount := 0 + op := func() error { + err = p.processor.process(task) + if err != nil && err != ErrTaskRetry { + retryCount++ + logger = p.initializeLoggerForTask(task, logger) + logging.LogTaskProcessingFailedEvent(logger, err) + } + return err + } + ProcessRetryLoop: - for retryCount := 1; retryCount <= p.options.MaxRetryCount(); { + for retryCount < p.options.MaxRetryCount() { select { case <-p.shutdownCh: return @@ -260,25 +274,17 @@ ProcessRetryLoop: default: } - err = p.processor.process(task) + err = backoff.Retry(op, p.retryPolicy, func(err error) bool { + return err != ErrTaskRetry + }) + if err != nil { if err == ErrTaskRetry { p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskStandbyRetryCounter) <-notificationChan - } else { - logger = p.initializeLoggerForTask(task, logger) - logging.LogTaskProcessingFailedEvent(logger, err) - - // it is possible that DomainNotActiveError is thrown - // just keep try for cache.DomainCacheRefreshInterval - // and giveup - if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { - p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter) - return - } - backoff := time.Duration(retryCount * 100) - time.Sleep(backoff * time.Millisecond) - retryCount++ + } else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { + p.metricsClient.IncCounter(p.options.MetricScope, metrics.HistoryTaskNotActiveCounter) + return } continue ProcessRetryLoop } diff --git a/service/history/replicatorQueueProcessor.go b/service/history/replicatorQueueProcessor.go index b3f75c29f47..29c2ea820b5 100644 --- a/service/history/replicatorQueueProcessor.go +++ b/service/history/replicatorQueueProcessor.go @@ -22,6 +22,8 @@ package history import ( "errors" + "sync" + "time" "github.com/uber-common/bark" h "github.com/uber/cadence/.gen/go/history" @@ -36,16 +38,20 @@ import ( type ( replicatorQueueProcessorImpl struct { - shard ShardContext - executionMgr persistence.ExecutionManager - historyMgr persistence.HistoryManager - hSerializerFactory persistence.HistorySerializerFactory - replicator messaging.Producer - metricsClient metrics.Client - options *QueueProcessorOptions - logger bark.Logger + currentClusterNamer string + shard ShardContext + executionMgr persistence.ExecutionManager + historyMgr persistence.HistoryManager + hSerializerFactory persistence.HistorySerializerFactory + replicator messaging.Producer + metricsClient metrics.Client + options *QueueProcessorOptions + logger bark.Logger *queueProcessorBase queueAckMgr + + sync.Mutex + lastShardSyncTimestamp time.Time } ) @@ -76,14 +82,15 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc }) processor := &replicatorQueueProcessorImpl{ - shard: shard, - executionMgr: executionMgr, - historyMgr: historyMgr, - hSerializerFactory: hSerializerFactory, - replicator: replicator, - metricsClient: shard.GetMetricsClient(), - options: options, - logger: logger, + currentClusterNamer: shard.GetService().GetClusterMetadata().GetCurrentClusterName(), + shard: shard, + executionMgr: executionMgr, + historyMgr: historyMgr, + hSerializerFactory: hSerializerFactory, + replicator: replicator, + metricsClient: shard.GetMetricsClient(), + options: options, + logger: logger, } queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetReplicatorAckLevel(), logger) @@ -166,7 +173,13 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi }, } - return p.replicator.Publish(replicationTask) + err = p.replicator.Publish(replicationTask) + if err == nil { + p.Lock() + p.lastShardSyncTimestamp = common.NewRealTimeSource().Now() + p.Unlock() + } + return err } func (p *replicatorQueueProcessorImpl) readTasks(readLevel int64) ([]queueTaskInfo, bool, error) { @@ -195,7 +208,33 @@ func (p *replicatorQueueProcessorImpl) completeTask(taskID int64) error { } func (p *replicatorQueueProcessorImpl) updateAckLevel(ackLevel int64) error { - return p.shard.UpdateReplicatorAckLevel(ackLevel) + err := p.shard.UpdateReplicatorAckLevel(ackLevel) + + // this is a hack, since there is not dedicated ticker on the queue processor + // to periodically send out sync shard message, put it here + now := common.NewRealTimeSource().Now() + sendSyncTask := false + p.Lock() + if p.lastShardSyncTimestamp.Add(p.shard.GetConfig().ShardSyncMinInterval()).Before(now) { + p.lastShardSyncTimestamp = now + sendSyncTask = true + } + p.Unlock() + + if sendSyncTask { + syncStatusTask := &replicator.ReplicationTask{ + TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus), + SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{ + SourceCluster: common.StringPtr(p.currentClusterNamer), + ShardId: common.Int64Ptr(int64(p.shard.GetShardID())), + Timestamp: common.Int64Ptr(now.UnixNano()), + }, + } + // ignore the error + p.replicator.Publish(syncStatusTask) + } + + return err } func (p *replicatorQueueProcessorImpl) getHistory(domainID, workflowID, runID string, firstEventID, diff --git a/service/history/service.go b/service/history/service.go index 958658f7653..a2035bea847 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -99,6 +99,8 @@ type Config struct { // ShardUpdateMinInterval the minimal time interval which the shard info can be updated ShardUpdateMinInterval dynamicconfig.DurationPropertyFn + // ShardSyncMinInterval the minimal time interval which the shard info should be sync to remote + ShardSyncMinInterval dynamicconfig.DurationPropertyFn // Time to hold a poll request before returning an empty response // right now only used by GetMutableState @@ -154,6 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config { HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.HistoryMgrNumConns, 50), MaximumBufferedEventsBatch: dc.GetIntProperty(dynamicconfig.MaximumBufferedEventsBatch, 100), ShardUpdateMinInterval: dc.GetDurationProperty(dynamicconfig.ShardUpdateMinInterval, 5*time.Minute), + ShardSyncMinInterval: dc.GetDurationProperty(dynamicconfig.ShardSyncMinInterval, 5*time.Minute), // history client: client/history/client.go set the client timeout 30s LongPollExpirationInterval: dc.GetDurationPropertyFilteredByDomain( dynamicconfig.HistoryLongPollExpirationInterval, time.Second*20, diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 94f186d794d..177b1f70a7c 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -27,9 +27,7 @@ import ( "time" "github.com/uber/cadence/common/cache" - "github.com/uber/cadence/common/messaging" - "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common/logging" "github.com/uber/cadence/common/metrics" @@ -90,7 +88,6 @@ type ( config *Config logger bark.Logger metricsClient metrics.Client - messageProducer messaging.Producer sync.RWMutex lastUpdated time.Time @@ -98,6 +95,8 @@ type ( transferSequenceNumber int64 maxTransferSequenceNumber int64 transferMaxReadLevel int64 + + // exist only in memory standbyClusterCurrentTime map[string]time.Time } ) @@ -588,26 +587,14 @@ func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) { } func (s *shardContextImpl) updateShardInfoLocked() error { - now := time.Now() + var err error + now := common.NewRealTimeSource().Now() if s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) { return nil } updatedShardInfo := copyShardInfo(s.shardInfo) - if s.messageProducer != nil { - syncStatusTask := &replicator.ReplicationTask{ - TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus), - SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{ - SourceCluster: common.StringPtr(s.currentCluster), - ShardId: common.Int64Ptr(int64(s.shardID)), - Timestamp: common.Int64Ptr(now.UnixNano()), - }, - } - // ignore the error - s.messageProducer.Publish(syncStatusTask) - } - - err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{ + err = s.shardManager.UpdateShard(&persistence.UpdateShardRequest{ ShardInfo: updatedShardInfo, PreviousRangeID: s.shardInfo.RangeID, }) @@ -677,8 +664,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time { // TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter. func acquireShard(shardID int, svc service.Service, shardManager persistence.ShardManager, historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache, - owner string, closeCh chan<- int, config *Config, logger bark.Logger, - metricsClient metrics.Client, messageProducer messaging.Producer) (ShardContext, + owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext, error) { response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID}) if err0 != nil { @@ -712,7 +698,6 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha shardInfo: updatedShardInfo, closeCh: closeCh, metricsClient: metricsClient, - messageProducer: messageProducer, config: config, standbyClusterCurrentTime: standbyClusterCurrentTime, } diff --git a/service/history/shardController.go b/service/history/shardController.go index 2abb5b90bcf..983faa8dc6d 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -27,7 +27,6 @@ import ( "time" "github.com/uber/cadence/common/cache" - "github.com/uber/cadence/common/messaging" "github.com/uber-common/bark" @@ -62,7 +61,6 @@ type ( logger bark.Logger config *Config metricsClient metrics.Client - messageProducer messaging.Producer sync.RWMutex historyShards map[int]*historyShardsItem @@ -71,26 +69,25 @@ type ( historyShardsItem struct { sync.RWMutex - shardID int - service service.Service - shardMgr persistence.ShardManager - historyMgr persistence.HistoryManager - executionMgr persistence.ExecutionManager - domainCache cache.DomainCache - engineFactory EngineFactory - host *membership.HostInfo - engine Engine - config *Config - logger bark.Logger - metricsClient metrics.Client - messageProducer messaging.Producer + shardID int + service service.Service + shardMgr persistence.ShardManager + historyMgr persistence.HistoryManager + executionMgr persistence.ExecutionManager + domainCache cache.DomainCache + engineFactory EngineFactory + host *membership.HostInfo + engine Engine + config *Config + logger bark.Logger + metricsClient metrics.Client } ) func newShardController(svc service.Service, host *membership.HostInfo, resolver membership.ServiceResolver, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, domainCache cache.DomainCache, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, - config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) *shardController { + config *Config, logger bark.Logger, metricsClient metrics.Client) *shardController { logger = logger.WithFields(bark.Fields{ logging.TagWorkflowComponent: logging.TagValueShardController, }) @@ -110,14 +107,13 @@ func newShardController(svc service.Service, host *membership.HostInfo, resolver logger: logger, config: config, metricsClient: metricsClient, - messageProducer: messageProducer, } } func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, domainCache cache.DomainCache, executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, host *membership.HostInfo, - config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) (*historyShardsItem, error) { + config *Config, logger bark.Logger, metricsClient metrics.Client) (*historyShardsItem, error) { executionMgr, err := executionMgrFactory.CreateExecutionManager(shardID) if err != nil { @@ -137,8 +133,7 @@ func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence logger: logger.WithFields(bark.Fields{ logging.TagHistoryShardID: shardID, }), - metricsClient: metricsClient, - messageProducer: messageProducer, + metricsClient: metricsClient, }, nil } @@ -228,7 +223,7 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar if info.Identity() == c.host.Identity() { shardItem, err := newHistoryShardsItem(shardID, c.service, c.shardMgr, c.historyMgr, c.domainCache, - c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient, c.messageProducer) + c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient) if err != nil { return nil, err } @@ -398,7 +393,7 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine, logging.LogShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID) context, err := acquireShard(i.shardID, i.service, i.shardMgr, i.historyMgr, i.executionMgr, i.domainCache, - i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient, i.messageProducer) + i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient) if err != nil { return nil, err } diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index 5955eb89355..8b1ab2117da 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -90,8 +90,7 @@ func (s *shardControllerSuite) SetupTest() { s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.metricsClient, s.logger) s.domainCache = cache.NewDomainCache(s.mockMetadaraMgr, s.mockClusterMetadata, s.metricsClient, s.logger) s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, - s.mockHistoryMgr, s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, - s.metricsClient, s.mockMessaging) + s.mockHistoryMgr, s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) } func (s *shardControllerSuite) TearDownTest() { @@ -345,7 +344,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -438,7 +437,7 @@ func (s *shardControllerSuite) TestRingUpdated() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} @@ -518,7 +517,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() { numShards := 4 s.config.NumberOfShards = numShards s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr, - s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging) + s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient) historyEngines := make(map[int]*MockHistoryEngine) for shardID := 0; shardID < numShards; shardID++ { mockEngine := &MockHistoryEngine{} diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index c027d04478e..8bfdaca4e1b 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -66,6 +66,7 @@ type ( timerQueueAckMgr timerQueueAckMgr rateLimiter common.TokenBucket startDelay dynamicconfig.DurationPropertyFn + retryPolicy backoff.RetryPolicy // worker coroutines notification workerNotificationChans []chan struct{} @@ -114,6 +115,7 @@ func newTimerQueueProcessorBase(scope int, shard ShardContext, historyService *h lastPollTime: time.Time{}, rateLimiter: common.NewTokenBucket(maxPollRPS(), common.NewRealTimeSource()), startDelay: startDelay, + retryPolicy: common.CreatePersistanceRetryPolicy(), } return base @@ -373,8 +375,20 @@ func (t *timerQueueProcessorBase) processWithRetry(notificationChan <-chan struc var logger bark.Logger var err error startTime := time.Now() + + attempt := 0 + op := func() error { + err = t.timerProcessor.process(task) + if err != nil && err != ErrTaskRetry { + attempt++ + logger = t.initializeLoggerForTask(task, logger) + logging.LogTaskProcessingFailedEvent(logger, err) + } + return err + } + ProcessRetryLoop: - for attempt := 1; attempt <= t.config.TimerTaskMaxRetryCount(); { + for attempt < t.config.TimerTaskMaxRetryCount() { select { case <-t.shutdownCh: return @@ -385,25 +399,17 @@ ProcessRetryLoop: default: } - err = t.timerProcessor.process(task) + err = backoff.Retry(op, t.retryPolicy, func(err error) bool { + return err != ErrTaskRetry + }) + if err != nil { if err == ErrTaskRetry { t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskStandbyRetryCounter) <-notificationChan - } else { - logger = t.initializeLoggerForTask(task, logger) - logging.LogTaskProcessingFailedEvent(logger, err) - - // it is possible that DomainNotActiveError is thrown - // just keep try for cache.DomainCacheRefreshInterval - // and giveup - if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { - t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskNotActiveCounter) - return - } - backoff := time.Duration(attempt * 100) - time.Sleep(backoff * time.Millisecond) - attempt++ + } else if _, ok := err.(*workflow.DomainNotActiveError); ok && time.Now().Sub(startTime) > cache.DomainCacheRefreshInterval { + t.metricsClient.IncCounter(t.scope, metrics.HistoryTaskNotActiveCounter) + return } continue ProcessRetryLoop }