Skip to content

Commit

Permalink
Replication task generation delay (#3465)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Aug 20, 2020
1 parent 2e58738 commit 00c60df
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
3 changes: 3 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ var keys = map[Key]string{
ReplicationTaskProcessorStartWaitJitterCoefficient: "history.ReplicationTaskProcessorStartWaitJitterCoefficient",
ReplicationTaskProcessorHostQPS: "history.ReplicationTaskProcessorHostQPS",
ReplicationTaskProcessorShardQPS: "history.ReplicationTaskProcessorShardQPS",
ReplicationTaskGenerationQPS: "history.ReplicationTaskGenerationQPS",
HistoryEnableRPCReplication: "history.EnableRPCReplication",
HistoryEnableKafkaReplication: "history.EnableKafkaReplication",
HistoryEnableCleanupReplicationTask: "history.EnableCleanupReplicationTask",
Expand Down Expand Up @@ -853,6 +854,8 @@ const (
ReplicationTaskProcessorHostQPS
// ReplicationTaskProcessorShardQPS is the qps of task processing rate limiter on shard level
ReplicationTaskProcessorShardQPS
//ReplicationTaskGenerationQPS is the wait time between each replication task generation qps
ReplicationTaskGenerationQPS
// HistoryEnableRPCReplication is the feature flag for RPC replication
HistoryEnableRPCReplication
// HistoryEnableKafkaReplication is the migration flag for Kafka replication
Expand Down
3 changes: 3 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ type Config struct {
ReplicationTaskProcessorStartWaitJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn
ReplicationTaskGenerationQPS dynamicconfig.FloatPropertyFn

// TODO: those two flags are for migration. Consider remove them after the migration complete
EnableRPCReplication dynamicconfig.BoolPropertyFn
EnableKafkaReplication dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -448,6 +450,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
EnableRPCReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableRPCReplication, false),
EnableKafkaReplication: dc.GetBoolProperty(dynamicconfig.HistoryEnableKafkaReplication, true),
EnableCleanupReplicationTask: dc.GetBoolProperty(dynamicconfig.HistoryEnableCleanupReplicationTask, true),
ReplicationTaskGenerationQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskGenerationQPS, 100),

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery, true),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain, false),
Expand Down
4 changes: 3 additions & 1 deletion service/history/replication/task_fetcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion service/history/replicatorQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
Expand All @@ -62,6 +63,7 @@ type (
queueAckMgr

lastShardSyncTimestamp time.Time
rateLimiter *quotas.DynamicRateLimiter
}
)

Expand Down Expand Up @@ -141,7 +143,9 @@ func newReplicatorQueueProcessor(
)
processor.queueAckMgr = queueAckMgr
processor.queueProcessorBase = queueProcessorBase

processor.rateLimiter = quotas.NewDynamicRateLimiter(func() float64 {
return config.ReplicationTaskGenerationQPS()
})
return processor
}

Expand Down Expand Up @@ -449,6 +453,7 @@ func (p *replicatorQueueProcessorImpl) getTasks(
var replicationTasks []*replicator.ReplicationTask
readLevel := lastReadTaskID
for _, taskInfo := range taskInfoList {
_ = p.rateLimiter.Wait(ctx)
var replicationTask *replicator.ReplicationTask
op := func() error {
var err error
Expand Down

0 comments on commit 00c60df

Please sign in to comment.