Skip to content

Commit

Permalink
Emitting metrics if DLQ is not empty (cadence-workflow#3389)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 4, 2021
1 parent ea64ad8 commit 5ef8ad0
Show file tree
Hide file tree
Showing 16 changed files with 241 additions and 10 deletions.
7 changes: 7 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
PersistencePutReplicationTaskToDLQScope
// PersistenceGetReplicationTasksFromDLQScope tracks PersistenceGetReplicationTasksFromDLQScope calls made by service to persistence layer
PersistenceGetReplicationTasksFromDLQScope
// PersistenceGetReplicationDLQSizeScope tracks PersistenceGetReplicationDLQSizeScope calls made by service to persistence layer
PersistenceGetReplicationDLQSizeScope
// PersistenceDeleteReplicationTaskFromDLQScope tracks PersistenceDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
Expand Down Expand Up @@ -1076,6 +1078,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},
PersistencePutReplicationTaskToDLQScope: {operation: "PutReplicationTaskToDLQ"},
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceGetReplicationDLQSizeScope: {operation: "GetReplicationDLQSize"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceCreateFailoverMarkerTasksScope: {operation: "CreateFailoverMarkerTasks"},
Expand Down Expand Up @@ -1803,6 +1806,8 @@ const (
ReplicationDLQFailed
ReplicationDLQMaxLevelGauge
ReplicationDLQAckLevelGauge
ReplicationDLQProbeFailed
ReplicationDLQSize
GetReplicationMessagesForShardLatency
GetDLQReplicationMessagesLatency
EventReapplySkippedCount
Expand Down Expand Up @@ -2238,6 +2243,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationDLQFailed: {metricName: "replication_dlq_enqueue_failed", metricType: Counter},
ReplicationDLQMaxLevelGauge: {metricName: "replication_dlq_max_level", metricType: Gauge},
ReplicationDLQAckLevelGauge: {metricName: "replication_dlq_ack_level", metricType: Gauge},
ReplicationDLQProbeFailed: {metricName: "replication_dlq_probe_failed", metricType: Counter},
ReplicationDLQSize: {metricName: "replication_dlq_size", metricType: Gauge},
GetReplicationMessagesForShardLatency: {metricName: "get_replication_messages_for_shard", metricType: Timer},
GetDLQReplicationMessagesLatency: {metricName: "get_dlq_replication_messages", metricType: Timer},
EventReapplySkippedCount: {metricName: "event_reapply_skipped_count", metricType: Counter},
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,29 @@ func (_m *ExecutionManager) GetReplicationTasksFromDLQ(request *persistence.GetR
return r0, r1
}

// GetReplicationDLQSize provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationDLQSize(request *persistence.GetReplicationDLQSizeRequest) (*persistence.GetReplicationDLQSizeResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetReplicationDLQSizeResponse
if rf, ok := ret.Get(0).(func(*persistence.GetReplicationDLQSizeRequest) *persistence.GetReplicationDLQSizeResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetReplicationDLQSizeResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetReplicationDLQSizeRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// DeleteReplicationTaskFromDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteReplicationTaskFromDLQ(
request *persistence.DeleteReplicationTaskFromDLQRequest,
Expand Down
39 changes: 39 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,14 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetDLQSizeQuery = `SELECT count(1) as count ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ?`

templateCompleteTransferTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -2838,6 +2846,37 @@ func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
return d.populateGetReplicationTasksResponse(query)
}

func (d *cassandraPersistence) GetReplicationDLQSize(
request *p.GetReplicationDLQSizeRequest,
) (*p.GetReplicationDLQSizeResponse, error) {

// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetDLQSizeQuery,
d.shardID,
rowTypeDLQ,
rowTypeDLQDomainID,
request.SourceClusterName,
rowTypeDLQRunID,
)

result := make(map[string]interface{})
if err := query.MapScan(result); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Error: %v", err),
}
}
queueSize := result["count"].(int64)
return &p.GetReplicationDLQSizeResponse{
Size: queueSize,
}, nil
}

func (d *cassandraPersistence) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,11 @@ type (
GetReplicationTasksRequest
}

// GetReplicationDLQSizeRequest is used to get one replication task from dlq
GetReplicationDLQSizeRequest struct {
SourceClusterName string
}

// DeleteReplicationTaskFromDLQRequest is used to delete replication task from DLQ
DeleteReplicationTaskFromDLQRequest struct {
SourceClusterName string
Expand All @@ -1038,6 +1043,11 @@ type (
// GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse

// GetReplicationDLQSizeResponse is the response for GetReplicationDLQSize
GetReplicationDLQSizeResponse struct {
Size int64
}

// RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue
RangeCompleteTimerTaskRequest struct {
InclusiveBeginTimestamp time.Time
Expand Down Expand Up @@ -1523,6 +1533,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,12 @@ func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
return m.persistence.GetReplicationTasksFromDLQ(request)
}

func (m *executionManagerImpl) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
return m.persistence.GetReplicationDLQSize(request)
}

func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5276,6 +5276,9 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() {
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
s.NoError(err)
s.Len(resp.Tasks, 2)
sizeResp, err := s.GetReplicationDLQSize(sourceCluster)
s.NoError(err)
s.Equal(int64(2), sizeResp.Size)
err = s.RangeDeleteReplicationTaskFromDLQ(sourceCluster, 0, 2)
s.NoError(err)
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,16 @@ func (s *TestBase) GetReplicationTasksFromDLQ(
})
}

// GetReplicationDLQSize is a utility method to read replication dlq size
func (s *TestBase) GetReplicationDLQSize(
sourceCluster string,
) (*p.GetReplicationDLQSizeResponse, error) {

return s.ExecutionManager.GetReplicationDLQSize(&p.GetReplicationDLQSizeRequest{
SourceClusterName: sourceCluster,
})
}

// DeleteReplicationTaskFromDLQ is a utility method to delete a replication task info
func (s *TestBase) DeleteReplicationTaskFromDLQ(
sourceCluster string,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,22 @@ func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ(
return response, err
}

func (p *workflowExecutionPersistenceClient) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationDLQSizeScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationDLQSizeScope, metrics.PersistenceLatency)
response, err := p.persistence.GetReplicationDLQSize(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetReplicationDLQSizeScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromD
return p.persistence.GetReplicationTasksFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

return p.persistence.GetReplicationDLQSize(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
25 changes: 25 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,31 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
}
}

func (m *sqlExecutionManager) GetReplicationDLQSize(
request *p.GetReplicationDLQSizeRequest,
) (*p.GetReplicationDLQSizeResponse, error) {

size, err := m.db.SelectFromReplicationDLQ(&sqlplugin.ReplicationTaskDLQFilter{
SourceClusterName: request.SourceClusterName,
ShardID: m.shardID,
})

switch err {
case nil:
return &p.GetReplicationDLQSizeResponse{
Size: size,
}, nil
case sql.ErrNoRows:
return &p.GetReplicationDLQSizeResponse{
Size: 0,
}, nil
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Select failed: %v", err),
}
}
}

func (m *sqlExecutionManager) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ type (
SourceClusterName string
}

// ReplicationTaskDLQFilter contains the column names within replication_tasks_dlq table that
// can be used to filter results through a WHERE clause
ReplicationTaskDLQFilter struct {
SourceClusterName string
ShardID int
}

// TimerTasksRow represents a row in timer_tasks table
TimerTasksRow struct {
ShardID int
Expand Down Expand Up @@ -607,6 +614,9 @@ type (
// SelectFromReplicationTasksDLQ returns one or more rows from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, minTaskID, pageSize}
SelectFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) ([]ReplicationTasksRow, error)
// SelectFromReplicationDLQ returns one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName}
SelectFromReplicationDLQ(filter *ReplicationTaskDLQFilter) (int64, error)
// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, taskID}
DeleteMessageFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) (sql.Result, error)
Expand Down
17 changes: 17 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ task_id > ? AND
task_id <= ?
ORDER BY task_id LIMIT ?`

getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
source_cluster_name = ? AND
shard_id = ?`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)`
Expand Down Expand Up @@ -351,6 +355,19 @@ func (mdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
func (mdb *db) SelectFromReplicationDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
var size []int64
if err := mdb.conn.Select(
&size, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID,
); err != nil {
return 0, err
}
return size[0], nil
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (mdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ shard_id = $2 AND
task_id > $3 AND
task_id <= $4
ORDER BY task_id LIMIT $5`
getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
source_cluster_name = $1 AND
shard_id = $2`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
Expand Down Expand Up @@ -350,6 +353,19 @@ func (pdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
func (pdb *db) SelectFromReplicationDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
var size []int64
if err := pdb.conn.Select(
&size, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID,
); err != nil {
return 0, err
}
return size[0], nil
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (pdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
Loading

0 comments on commit 5ef8ad0

Please sign in to comment.