Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emitting metrics if DLQ is not empty #3389

Merged
merged 6 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
PersistencePutReplicationTaskToDLQScope
// PersistenceGetReplicationTasksFromDLQScope tracks PersistenceGetReplicationTasksFromDLQScope calls made by service to persistence layer
PersistenceGetReplicationTasksFromDLQScope
// PersistenceGetReplicationTaskFromDLQScope tracks PersistenceGetReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceGetReplicationTaskFromDLQScope
// PersistenceDeleteReplicationTaskFromDLQScope tracks PersistenceDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
Expand Down Expand Up @@ -1076,6 +1078,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},
PersistencePutReplicationTaskToDLQScope: {operation: "PutReplicationTaskToDLQ"},
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceGetReplicationTaskFromDLQScope: {operation: "GetReplicationTaskFromDLQ"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceCreateFailoverMarkerTasksScope: {operation: "CreateFailoverMarkerTasks"},
Expand Down Expand Up @@ -1803,6 +1806,8 @@ const (
ReplicationDLQFailed
ReplicationDLQMaxLevelGauge
ReplicationDLQAckLevelGauge
ReplicationDLQProbeFailed
ReplicationDLQNotEmptyCount
GetReplicationMessagesForShardLatency
GetDLQReplicationMessagesLatency
EventReapplySkippedCount
Expand Down Expand Up @@ -2238,6 +2243,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationDLQFailed: {metricName: "replication_dlq_enqueue_failed", metricType: Counter},
ReplicationDLQMaxLevelGauge: {metricName: "replication_dlq_max_level", metricType: Gauge},
ReplicationDLQAckLevelGauge: {metricName: "replication_dlq_ack_level", metricType: Gauge},
ReplicationDLQProbeFailed: {metricName: "replication_dlq_probe_failed", metricType: Counter},
ReplicationDLQNotEmptyCount: {metricName: "replication_dlq_not_empty", metricType: Counter},
GetReplicationMessagesForShardLatency: {metricName: "get_replication_messages_for_shard", metricType: Timer},
GetDLQReplicationMessagesLatency: {metricName: "get_dlq_replication_messages", metricType: Timer},
EventReapplySkippedCount: {metricName: "event_reapply_skipped_count", metricType: Counter},
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,29 @@ func (_m *ExecutionManager) GetReplicationTasksFromDLQ(request *persistence.GetR
return r0, r1
}

// GetReplicationTaskFromDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationTaskFromDLQ(request *persistence.GetReplicationTaskFromDLQRequest) (*persistence.GetReplicationTaskFromDLQResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetReplicationTaskFromDLQResponse
if rf, ok := ret.Get(0).(func(*persistence.GetReplicationTaskFromDLQRequest) *persistence.GetReplicationTaskFromDLQResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetReplicationTaskFromDLQResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetReplicationTaskFromDLQRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// DeleteReplicationTaskFromDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteReplicationTaskFromDLQ(
request *persistence.DeleteReplicationTaskFromDLQRequest,
Expand Down
25 changes: 25 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,15 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetDLQReplicationTaskQuery = `SELECT replication ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`limit 1`
yux0 marked this conversation as resolved.
Show resolved Hide resolved

templateCompleteTransferTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -2838,6 +2847,22 @@ func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
return d.populateGetReplicationTasksResponse(query)
}

func (d *cassandraPersistence) GetReplicationTaskFromDLQ(
request *p.GetReplicationTaskFromDLQRequest,
) (*p.GetReplicationTaskFromDLQResponse, error) {

// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetDLQReplicationTaskQuery,
d.shardID,
rowTypeDLQ,
rowTypeDLQDomainID,
request.SourceClusterName,
rowTypeDLQRunID,
)

return d.populateGetReplicationTasksResponse(query)
}

func (d *cassandraPersistence) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
9 changes: 9 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,11 @@ type (
GetReplicationTasksRequest
}

// GetReplicationTaskFromDLQRequest is used to get one replication task from dlq
GetReplicationTaskFromDLQRequest struct {
SourceClusterName string
}

// DeleteReplicationTaskFromDLQRequest is used to delete replication task from DLQ
DeleteReplicationTaskFromDLQRequest struct {
SourceClusterName string
Expand All @@ -1038,6 +1043,9 @@ type (
// GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse

// GetReplicationTaskFromDLQResponse is the response for GetReplicationTaskFromDLQ
GetReplicationTaskFromDLQResponse = GetReplicationTasksResponse

// RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue
RangeCompleteTimerTaskRequest struct {
InclusiveBeginTimestamp time.Time
Expand Down Expand Up @@ -1523,6 +1531,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationTaskFromDLQ(request *GetReplicationTaskFromDLQRequest) (*GetReplicationTaskFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,12 @@ func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
return m.persistence.GetReplicationTasksFromDLQ(request)
}

func (m *executionManagerImpl) GetReplicationTaskFromDLQ(
request *GetReplicationTaskFromDLQRequest,
) (*GetReplicationTaskFromDLQResponse, error) {
return m.persistence.GetReplicationTaskFromDLQ(request)
}

func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5276,6 +5276,9 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() {
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
s.NoError(err)
s.Len(resp.Tasks, 2)
resp, err = s.GetReplicationTaskFromDLQ(sourceCluster)
s.NoError(err)
s.Len(resp.Tasks, 1)
err = s.RangeDeleteReplicationTaskFromDLQ(sourceCluster, 0, 2)
s.NoError(err)
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,16 @@ func (s *TestBase) GetReplicationTasksFromDLQ(
})
}

// GetReplicationTaskFromDLQ is a utility method to read one replication task info
func (s *TestBase) GetReplicationTaskFromDLQ(
sourceCluster string,
) (*p.GetReplicationTaskFromDLQResponse, error) {

return s.ExecutionManager.GetReplicationTaskFromDLQ(&p.GetReplicationTaskFromDLQRequest{
SourceClusterName: sourceCluster,
})
}

// DeleteReplicationTaskFromDLQ is a utility method to delete a replication task info
func (s *TestBase) DeleteReplicationTaskFromDLQ(
sourceCluster string,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationTaskFromDLQ(request *GetReplicationTaskFromDLQRequest) (*GetReplicationTaskFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,22 @@ func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ(
return response, err
}

func (p *workflowExecutionPersistenceClient) GetReplicationTaskFromDLQ(
request *GetReplicationTaskFromDLQRequest,
) (*GetReplicationTaskFromDLQResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationTaskFromDLQScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationTaskFromDLQScope, metrics.PersistenceLatency)
response, err := p.persistence.GetReplicationTaskFromDLQ(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetReplicationTaskFromDLQScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromD
return p.persistence.GetReplicationTasksFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTaskFromDLQ(
request *GetReplicationTaskFromDLQRequest,
) (*GetReplicationTaskFromDLQResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

return p.persistence.GetReplicationTaskFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
21 changes: 21 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,27 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
}
}

func (m *sqlExecutionManager) GetReplicationTaskFromDLQ(
request *p.GetReplicationTaskFromDLQRequest,
) (*p.GetReplicationTaskFromDLQResponse, error) {

rows, err := m.db.SelectFromReplicationTaskDLQ(&sqlplugin.ReplicationTaskDLQFilter{
SourceClusterName: request.SourceClusterName,
ShardID: m.shardID,
})

switch err {
case nil:
return m.populateGetReplicationTasksResponse(rows, 0)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err),
}
}
}

func (m *sqlExecutionManager) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ type (
SourceClusterName string
}

// ReplicationTaskDLQFilter contains the column names within replication_tasks_dlq table that
// can be used to filter results through a WHERE clause
ReplicationTaskDLQFilter struct {
SourceClusterName string
ShardID int
}

// TimerTasksRow represents a row in timer_tasks table
TimerTasksRow struct {
ShardID int
Expand Down Expand Up @@ -607,6 +614,9 @@ type (
// SelectFromReplicationTasksDLQ returns one or more rows from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, minTaskID, pageSize}
SelectFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) ([]ReplicationTasksRow, error)
// SelectFromReplicationTasksDLQ returns one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName}
SelectFromReplicationTaskDLQ(filter *ReplicationTaskDLQFilter) ([]ReplicationTasksRow, error)
// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, taskID}
DeleteMessageFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) (sql.Result, error)
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ task_id > ? AND
task_id <= ?
ORDER BY task_id LIMIT ?`

getReplicationTaskDLQQuery = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE
source_cluster_name = ? AND
shard_id = ? LIMIT 1`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)`
Expand Down Expand Up @@ -351,6 +355,16 @@ func (mdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationTaskDLQ reads one row from replication_tasks_dlq table
func (mdb *db) SelectFromReplicationTaskDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) ([]sqlplugin.ReplicationTasksRow, error) {
var rows []sqlplugin.ReplicationTasksRow
err := mdb.conn.Select(
&rows, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID)
return rows, err
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (mdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
13 changes: 13 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ shard_id = $2 AND
task_id > $3 AND
task_id <= $4
ORDER BY task_id LIMIT $5`
getReplicationTaskDLQQuery = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE
source_cluster_name = $1 AND
shard_id = $2 LIMIT 1`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
Expand Down Expand Up @@ -350,6 +353,16 @@ func (pdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationTaskDLQ reads one row from replication_tasks_dlq table
func (pdb *db) SelectFromReplicationTaskDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) ([]sqlplugin.ReplicationTasksRow, error) {
var rows []sqlplugin.ReplicationTasksRow
err := pdb.conn.Select(
&rows, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID)
return rows, err
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (pdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
36 changes: 36 additions & 0 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
replicationTimeout = 30 * time.Second
taskErrorRetryBackoffCoefficient = 1.2
dlqErrorRetryWait = time.Second
dlqMetricsEmitTimerInterval = 5 * time.Minute
dlqMetricsEmitTimerCoefficient = 0.05
)

var (
Expand Down Expand Up @@ -149,6 +151,7 @@ func (p *taskProcessorImpl) Start() {
go p.processorLoop()
go p.syncShardStatusLoop()
go p.cleanupReplicationTaskLoop()
go p.emitDLQSizeMetricsLoop()
p.logger.Info("ReplicationTaskProcessor started.")
}

Expand Down Expand Up @@ -508,6 +511,39 @@ func (p *taskProcessorImpl) generateDLQRequest(
}
}

func (p *taskProcessorImpl) emitDLQSizeMetricsLoop() {
timer := time.NewTimer(backoff.JitDuration(
dlqMetricsEmitTimerInterval,
dlqMetricsEmitTimerCoefficient,
))
staticRequest := &persistence.GetReplicationTaskFromDLQRequest{
SourceClusterName: p.sourceCluster,
}
defer timer.Stop()

for {
select {
case <-timer.C:
resp, err := p.shard.GetExecutionManager().GetReplicationTaskFromDLQ(staticRequest)
if err != nil {
p.logger.Error("failed to get one task from replication DLQ", tag.Error(err))
p.metricsClient.Scope(metrics.ReplicationDLQStatsScope).IncCounter(metrics.ReplicationDLQProbeFailed)
}

if len(resp.Tasks) > 0 {
p.metricsClient.Scope(metrics.ReplicationDLQStatsScope).IncCounter(metrics.ReplicationDLQNotEmptyCount)
}
timer.Reset(backoff.JitDuration(
dlqMetricsEmitTimerInterval,
dlqMetricsEmitTimerCoefficient,
))
case <-p.done:
timer.Stop()
return
}
}
}

func isTransientRetryableError(err error) bool {
switch err.(type) {
case *shared.BadRequestError:
Expand Down