Skip to content

Commit

Permalink
Update replication ack level if response has no task (#3356)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Jun 23, 2020
1 parent ef13c65 commit 0a11569
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 11 deletions.
5 changes: 5 additions & 0 deletions common/service/dynamicconfig/config_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func GetDurationPropertyFnFilteredByTaskListInfo(value time.Duration) func(domai
return func(domain string, taskList string, taskType int) time.Duration { return value }
}

// GetDurationPropertyFnFilteredByTShardID returns value as DurationPropertyFnWithTaskListInfoFilters
func GetDurationPropertyFnFilteredByTShardID(value time.Duration) func(shardID int) time.Duration {
return func(shardID int) time.Duration { return value }
}

// GetStringPropertyFn returns value as StringPropertyFn
func GetStringPropertyFn(value string) func(opts ...FilterOption) string {
return func(...FilterOption) string { return value }
Expand Down
10 changes: 5 additions & 5 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func NewTaskProcessor(
dlqRetryPolicy: dlqRetryPolicy,
noTaskRetrier: noTaskRetrier,
requestChan: taskFetcher.GetRequestChan(),
syncShardChan: make(chan *r.SyncShardStatus),
syncShardChan: make(chan *r.SyncShardStatus, 1),
done: make(chan struct{}),
lastProcessedMessageID: common.EmptyMessageID,
lastRetrievedMessageID: common.EmptyMessageID,
Expand Down Expand Up @@ -247,7 +247,7 @@ func (p *taskProcessorImpl) cleanupAckedReplicationTasks() error {
}
}

p.logger.Info("Cleaning up replication task queue.", tag.ReadLevel(minAckLevel))
p.logger.Debug("Cleaning up replication task queue.", tag.ReadLevel(minAckLevel))
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupCount)
p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope,
metrics.TargetClusterTag(p.currentCluster),
Expand Down Expand Up @@ -278,6 +278,9 @@ func (p *taskProcessorImpl) sendFetchMessageRequest() <-chan *r.ReplicationMessa

func (p *taskProcessorImpl) processResponse(response *r.ReplicationMessages) {

p.lastProcessedMessageID = response.GetLastRetrievedMessageId()
p.lastRetrievedMessageID = response.GetLastRetrievedMessageId()

p.syncShardChan <- response.GetSyncShardStatus()
// Note here we check replication tasks instead of hasMore. The expectation is that in a steady state
// we will receive replication tasks but hasMore is false (meaning that we are always catching up).
Expand All @@ -295,9 +298,6 @@ func (p *taskProcessorImpl) processResponse(response *r.ReplicationMessages) {
return
}
}

p.lastProcessedMessageID = response.GetLastRetrievedMessageId()
p.lastRetrievedMessageID = response.GetLastRetrievedMessageId()
scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster))
scope.UpdateGauge(metrics.LastRetrievedMessageID, float64(p.lastRetrievedMessageID))
p.noTaskRetrier.Reset()
Expand Down
12 changes: 12 additions & 0 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/shard"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (s *taskProcessorSuite) SetupTest() {

s.mockEngine = engine.NewMockEngine(s.controller)
s.config = config.NewForTest()
s.config.ReplicationTaskProcessorNoTaskRetryWait = dynamicconfig.GetDurationPropertyFnFilteredByTShardID(1 * time.Millisecond)
s.historyClient = historyservicetest.NewMockClient(s.controller)
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.requestChan = make(chan *request, 10)
Expand All @@ -131,6 +133,16 @@ func (s *taskProcessorSuite) TearDownTest() {
s.mockShard.Finish(s.T())
}

func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
response := &replicator.ReplicationMessages{
LastRetrievedMessageId: common.Int64Ptr(100),
}

s.taskProcessor.processResponse(response)
s.Equal(int64(100), s.taskProcessor.lastProcessedMessageID)
s.Equal(int64(100), s.taskProcessor.lastRetrievedMessageID)
}

func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
s.taskProcessor.sendFetchMessageRequest()
requestMessage := <-s.requestChan
Expand Down
14 changes: 8 additions & 6 deletions service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package history
import (
ctx "context"
"errors"
"strconv"
"time"

"github.com/uber/cadence/.gen/go/replicator"
Expand Down Expand Up @@ -461,21 +462,22 @@ func (p *replicatorQueueProcessorImpl) getTasks(
}
}

// Note this is a very rough indicator of how much the remote DC is behind on this shard.
p.metricsClient.RecordTimer(
replicationScope := p.metricsClient.Scope(
metrics.ReplicatorQueueProcessorScope,
metrics.InstanceTag(strconv.Itoa(p.shard.GetShardID())),
)

replicationScope.RecordTimer(
metrics.ReplicationTasksLag,
time.Duration(p.shard.GetTransferMaxReadLevel()-readLevel),
)

p.metricsClient.RecordTimer(
metrics.ReplicatorQueueProcessorScope,
replicationScope.RecordTimer(
metrics.ReplicationTasksFetched,
time.Duration(len(taskInfoList)),
)

p.metricsClient.RecordTimer(
metrics.ReplicatorQueueProcessorScope,
replicationScope.RecordTimer(
metrics.ReplicationTasksReturned,
time.Duration(len(replicationTasks)),
)
Expand Down

0 comments on commit 0a11569

Please sign in to comment.