Skip to content

Commit

Permalink
address commends
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Jul 12, 2018
1 parent 3f62a38 commit 1967272
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 101 deletions.
2 changes: 1 addition & 1 deletion service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (h *Handler) Start() error {
h.domainCache = cache.NewDomainCache(h.metadataMgr, h.GetClusterMetadata(), h.GetMetricsClient(), h.GetLogger())
h.domainCache.Start()
h.controller = newShardController(h.Service, h.GetHostInfo(), hServiceResolver, h.shardManager, h.historyMgr,
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient(), h.publisher)
h.domainCache, h.executionMgrFactory, h, h.config, h.GetLogger(), h.GetMetricsClient())
h.metricsClient = h.GetMetricsClient()
h.historyEventNotifier = newHistoryEventNotifier(h.GetMetricsClient(), h.config.GetShardID)
// events notifier must starts before controller
Expand Down
16 changes: 0 additions & 16 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,22 +247,6 @@ func (s *TestShardContext) UpdateTimerClusterAckLevel(cluster string, ackLevel t
return nil
}

// GetLastShardSyncTimestamp get the last shard sync time
func (s *TestShardContext) GetLastShardSyncTimestamp() time.Time {
s.RLock()
defer s.RUnlock()

return s.lastShardSyncTimestamp
}

// UpdateLastShardSyncTimestamp set the last shard sync time
func (s *TestShardContext) UpdateLastShardSyncTimestamp(now time.Time) {
s.Lock()
defer s.Unlock()

s.lastShardSyncTimestamp = now
}

// GetDomainNotificationVersion test implementation
func (s *TestShardContext) GetDomainNotificationVersion() int64 {
s.RLock()
Expand Down
67 changes: 49 additions & 18 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package history

import (
"errors"
"sync"
"time"

"github.com/uber-common/bark"
h "github.com/uber/cadence/.gen/go/history"
Expand All @@ -36,16 +38,20 @@ import (

type (
replicatorQueueProcessorImpl struct {
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
currentClusterNamer string
shard ShardContext
executionMgr persistence.ExecutionManager
historyMgr persistence.HistoryManager
hSerializerFactory persistence.HistorySerializerFactory
replicator messaging.Producer
metricsClient metrics.Client
options *QueueProcessorOptions
logger bark.Logger
*queueProcessorBase
queueAckMgr

sync.Mutex
lastShardSyncTimestamp time.Time
}
)

Expand Down Expand Up @@ -76,14 +82,15 @@ func newReplicatorQueueProcessor(shard ShardContext, replicator messaging.Produc
})

processor := &replicatorQueueProcessorImpl{
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
currentClusterNamer: shard.GetService().GetClusterMetadata().GetCurrentClusterName(),
shard: shard,
executionMgr: executionMgr,
historyMgr: historyMgr,
hSerializerFactory: hSerializerFactory,
replicator: replicator,
metricsClient: shard.GetMetricsClient(),
options: options,
logger: logger,
}

queueAckMgr := newQueueAckMgr(shard, options, processor, shard.GetReplicatorAckLevel(), logger)
Expand Down Expand Up @@ -168,7 +175,9 @@ func (p *replicatorQueueProcessorImpl) processHistoryReplicationTask(task *persi

err = p.replicator.Publish(replicationTask)
if err == nil {
p.shard.UpdateLastShardSyncTimestamp(common.NewRealTimeSource().Now())
p.Lock()
p.lastShardSyncTimestamp = common.NewRealTimeSource().Now()
p.Unlock()
}
return err
}
Expand Down Expand Up @@ -199,7 +208,29 @@ func (p *replicatorQueueProcessorImpl) completeTask(taskID int64) error {
}

func (p *replicatorQueueProcessorImpl) updateAckLevel(ackLevel int64) error {
return p.shard.UpdateReplicatorAckLevel(ackLevel)
err := p.shard.UpdateReplicatorAckLevel(ackLevel)

// this is a hack, since there is not dedicated ticker on the queue processor
// to periodically send out sync shard message, put it here
now := common.NewRealTimeSource().Now()
p.Lock()
defer p.Unlock()
if p.lastShardSyncTimestamp.Add(p.shard.GetConfig().ShardUpdateMinInterval()).Before(now) {
p.lastShardSyncTimestamp = now

syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(p.currentClusterNamer),
ShardId: common.Int64Ptr(int64(p.shard.GetShardID())),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
p.replicator.Publish(syncStatusTask)
}

return err
}

func (p *replicatorQueueProcessorImpl) getHistory(domainID, workflowID, runID string, firstEventID,
Expand Down
40 changes: 1 addition & 39 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/messaging"

"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -60,8 +58,6 @@ type (
UpdateTimerAckLevel(ackLevel time.Time) error
GetTimerClusterAckLevel(cluster string) time.Time
UpdateTimerClusterAckLevel(cluster string, ackLevel time.Time) error
GetLastShardSyncTimestamp() time.Time
UpdateLastShardSyncTimestamp(time.Time)
GetDomainNotificationVersion() int64
UpdateDomainNotificationVersion(domainNotificationVersion int64) error
CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
Expand Down Expand Up @@ -92,7 +88,6 @@ type (
config *Config
logger bark.Logger
metricsClient metrics.Client
messageProducer messaging.Producer

sync.RWMutex
lastUpdated time.Time
Expand All @@ -103,7 +98,6 @@ type (

// exist only in memory
standbyClusterCurrentTime map[string]time.Time
lastShardSyncTimestamp time.Time
}
)

Expand Down Expand Up @@ -248,20 +242,6 @@ func (s *shardContextImpl) UpdateDomainNotificationVersion(domainNotificationVer
return s.updateShardInfoLocked()
}

func (s *shardContextImpl) GetLastShardSyncTimestamp() time.Time {
s.RLock()
defer s.RUnlock()

return s.lastShardSyncTimestamp
}

func (s *shardContextImpl) UpdateLastShardSyncTimestamp(now time.Time) {
s.Lock()
defer s.Unlock()

s.lastShardSyncTimestamp = now
}

func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) (
*persistence.CreateWorkflowExecutionResponse, error) {
s.Lock()
Expand Down Expand Up @@ -614,22 +594,6 @@ func (s *shardContextImpl) updateShardInfoLocked() error {
}
updatedShardInfo := copyShardInfo(s.shardInfo)

if s.messageProducer != nil && s.lastShardSyncTimestamp.Add(s.config.ShardUpdateMinInterval()).Before(now) {
syncStatusTask := &replicator.ReplicationTask{
TaskType: replicator.ReplicationTaskType.Ptr(replicator.ReplicationTaskTypeSyncShardStatus),
SyncShardStatusTaskAttributes: &replicator.SyncShardStatusTaskAttributes{
SourceCluster: common.StringPtr(s.currentCluster),
ShardId: common.Int64Ptr(int64(s.shardID)),
Timestamp: common.Int64Ptr(now.UnixNano()),
},
}
// ignore the error
err = s.messageProducer.Publish(syncStatusTask)
if err == nil {
s.lastShardSyncTimestamp = now
}
}

err = s.shardManager.UpdateShard(&persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
PreviousRangeID: s.shardInfo.RangeID,
Expand Down Expand Up @@ -700,8 +664,7 @@ func (s *shardContextImpl) GetCurrentTime(cluster string) time.Time {
// TODO: This method has too many parameters. Clean it up. Maybe create a struct to pass in as parameter.
func acquireShard(shardID int, svc service.Service, shardManager persistence.ShardManager,
historyMgr persistence.HistoryManager, executionMgr persistence.ExecutionManager, domainCache cache.DomainCache,
owner string, closeCh chan<- int, config *Config, logger bark.Logger,
metricsClient metrics.Client, messageProducer messaging.Producer) (ShardContext,
owner string, closeCh chan<- int, config *Config, logger bark.Logger, metricsClient metrics.Client) (ShardContext,
error) {
response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID})
if err0 != nil {
Expand Down Expand Up @@ -735,7 +698,6 @@ func acquireShard(shardID int, svc service.Service, shardManager persistence.Sha
shardInfo: updatedShardInfo,
closeCh: closeCh,
metricsClient: metricsClient,
messageProducer: messageProducer,
config: config,
standbyClusterCurrentTime: standbyClusterCurrentTime,
}
Expand Down
39 changes: 17 additions & 22 deletions service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/messaging"

"github.com/uber-common/bark"

Expand Down Expand Up @@ -62,7 +61,6 @@ type (
logger bark.Logger
config *Config
metricsClient metrics.Client
messageProducer messaging.Producer

sync.RWMutex
historyShards map[int]*historyShardsItem
Expand All @@ -71,26 +69,25 @@ type (

historyShardsItem struct {
sync.RWMutex
shardID int
service service.Service
shardMgr persistence.ShardManager
historyMgr persistence.HistoryManager
executionMgr persistence.ExecutionManager
domainCache cache.DomainCache
engineFactory EngineFactory
host *membership.HostInfo
engine Engine
config *Config
logger bark.Logger
metricsClient metrics.Client
messageProducer messaging.Producer
shardID int
service service.Service
shardMgr persistence.ShardManager
historyMgr persistence.HistoryManager
executionMgr persistence.ExecutionManager
domainCache cache.DomainCache
engineFactory EngineFactory
host *membership.HostInfo
engine Engine
config *Config
logger bark.Logger
metricsClient metrics.Client
}
)

func newShardController(svc service.Service, host *membership.HostInfo, resolver membership.ServiceResolver,
shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager, domainCache cache.DomainCache,
executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory,
config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) *shardController {
config *Config, logger bark.Logger, metricsClient metrics.Client) *shardController {
logger = logger.WithFields(bark.Fields{
logging.TagWorkflowComponent: logging.TagValueShardController,
})
Expand All @@ -110,14 +107,13 @@ func newShardController(svc service.Service, host *membership.HostInfo, resolver
logger: logger,
config: config,
metricsClient: metricsClient,
messageProducer: messageProducer,
}
}

func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence.ShardManager,
historyMgr persistence.HistoryManager, domainCache cache.DomainCache,
executionMgrFactory persistence.ExecutionManagerFactory, factory EngineFactory, host *membership.HostInfo,
config *Config, logger bark.Logger, metricsClient metrics.Client, messageProducer messaging.Producer) (*historyShardsItem, error) {
config *Config, logger bark.Logger, metricsClient metrics.Client) (*historyShardsItem, error) {

executionMgr, err := executionMgrFactory.CreateExecutionManager(shardID)
if err != nil {
Expand All @@ -137,8 +133,7 @@ func newHistoryShardsItem(shardID int, svc service.Service, shardMgr persistence
logger: logger.WithFields(bark.Fields{
logging.TagHistoryShardID: shardID,
}),
metricsClient: metricsClient,
messageProducer: messageProducer,
metricsClient: metricsClient,
}, nil
}

Expand Down Expand Up @@ -228,7 +223,7 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar

if info.Identity() == c.host.Identity() {
shardItem, err := newHistoryShardsItem(shardID, c.service, c.shardMgr, c.historyMgr, c.domainCache,
c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient, c.messageProducer)
c.executionMgrFactory, c.engineFactory, c.host, c.config, c.logger, c.metricsClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,7 +393,7 @@ func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine,
logging.LogShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID)

context, err := acquireShard(i.shardID, i.service, i.shardMgr, i.historyMgr, i.executionMgr, i.domainCache,
i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient, i.messageProducer)
i.host.Identity(), shardClosedCh, i.config, i.logger, i.metricsClient)
if err != nil {
return nil, err
}
Expand Down
9 changes: 4 additions & 5 deletions service/history/shardController_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func (s *shardControllerSuite) SetupTest() {
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, s.metricsClient, s.logger)
s.domainCache = cache.NewDomainCache(s.mockMetadaraMgr, s.mockClusterMetadata, s.metricsClient, s.logger)
s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager,
s.mockHistoryMgr, s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger,
s.metricsClient, s.mockMessaging)
s.mockHistoryMgr, s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient)
}

func (s *shardControllerSuite) TearDownTest() {
Expand Down Expand Up @@ -345,7 +344,7 @@ func (s *shardControllerSuite) TestHistoryEngineClosed() {
numShards := 4
s.config.NumberOfShards = numShards
s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr,
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging)
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient)
historyEngines := make(map[int]*MockHistoryEngine)
for shardID := 0; shardID < numShards; shardID++ {
mockEngine := &MockHistoryEngine{}
Expand Down Expand Up @@ -438,7 +437,7 @@ func (s *shardControllerSuite) TestRingUpdated() {
numShards := 4
s.config.NumberOfShards = numShards
s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr,
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging)
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient)
historyEngines := make(map[int]*MockHistoryEngine)
for shardID := 0; shardID < numShards; shardID++ {
mockEngine := &MockHistoryEngine{}
Expand Down Expand Up @@ -518,7 +517,7 @@ func (s *shardControllerSuite) TestShardControllerClosed() {
numShards := 4
s.config.NumberOfShards = numShards
s.controller = newShardController(s.mockService, s.hostInfo, s.mockServiceResolver, s.mockShardManager, s.mockHistoryMgr,
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient, s.mockMessaging)
s.domainCache, s.mockExecutionMgrFactory, s.mockEngineFactory, s.config, s.logger, s.metricsClient)
historyEngines := make(map[int]*MockHistoryEngine)
for shardID := 0; shardID < numShards; shardID++ {
mockEngine := &MockHistoryEngine{}
Expand Down

0 comments on commit 1967272

Please sign in to comment.