Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emitting metrics if DLQ is not empty #3389

Merged
merged 6 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ const (
PersistencePutReplicationTaskToDLQScope
// PersistenceGetReplicationTasksFromDLQScope tracks PersistenceGetReplicationTasksFromDLQScope calls made by service to persistence layer
PersistenceGetReplicationTasksFromDLQScope
// PersistenceGetReplicationDLQSizeScope tracks PersistenceGetReplicationDLQSizeScope calls made by service to persistence layer
PersistenceGetReplicationDLQSizeScope
// PersistenceDeleteReplicationTaskFromDLQScope tracks PersistenceDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
Expand Down Expand Up @@ -1076,6 +1078,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},
PersistencePutReplicationTaskToDLQScope: {operation: "PutReplicationTaskToDLQ"},
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceGetReplicationDLQSizeScope: {operation: "GetReplicationDLQSize"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceCreateFailoverMarkerTasksScope: {operation: "CreateFailoverMarkerTasks"},
Expand Down Expand Up @@ -1803,6 +1806,8 @@ const (
ReplicationDLQFailed
ReplicationDLQMaxLevelGauge
ReplicationDLQAckLevelGauge
ReplicationDLQProbeFailed
ReplicationDLQSize
GetReplicationMessagesForShardLatency
GetDLQReplicationMessagesLatency
EventReapplySkippedCount
Expand Down Expand Up @@ -2238,6 +2243,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ReplicationDLQFailed: {metricName: "replication_dlq_enqueue_failed", metricType: Counter},
ReplicationDLQMaxLevelGauge: {metricName: "replication_dlq_max_level", metricType: Gauge},
ReplicationDLQAckLevelGauge: {metricName: "replication_dlq_ack_level", metricType: Gauge},
ReplicationDLQProbeFailed: {metricName: "replication_dlq_probe_failed", metricType: Counter},
ReplicationDLQSize: {metricName: "replication_dlq_size", metricType: Gauge},
GetReplicationMessagesForShardLatency: {metricName: "get_replication_messages_for_shard", metricType: Timer},
GetDLQReplicationMessagesLatency: {metricName: "get_dlq_replication_messages", metricType: Timer},
EventReapplySkippedCount: {metricName: "event_reapply_skipped_count", metricType: Counter},
Expand Down
23 changes: 23 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,29 @@ func (_m *ExecutionManager) GetReplicationTasksFromDLQ(request *persistence.GetR
return r0, r1
}

// GetReplicationDLQSize provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationDLQSize(request *persistence.GetReplicationDLQSizeRequest) (*persistence.GetReplicationDLQSizeResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetReplicationDLQSizeResponse
if rf, ok := ret.Get(0).(func(*persistence.GetReplicationDLQSizeRequest) *persistence.GetReplicationDLQSizeResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetReplicationDLQSizeResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetReplicationDLQSizeRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// DeleteReplicationTaskFromDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) DeleteReplicationTaskFromDLQ(
request *persistence.DeleteReplicationTaskFromDLQRequest,
Expand Down
39 changes: 39 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,14 @@ workflow_state = ? ` +
`and task_id > ? ` +
`and task_id <= ?`

templateGetDLQSizeQuery = `SELECT count(1) as count ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ?`

templateCompleteTransferTaskQuery = `DELETE FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -2838,6 +2846,37 @@ func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
return d.populateGetReplicationTasksResponse(query)
}

func (d *cassandraPersistence) GetReplicationDLQSize(
request *p.GetReplicationDLQSizeRequest,
) (*p.GetReplicationDLQSizeResponse, error) {

// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetDLQSizeQuery,
d.shardID,
rowTypeDLQ,
rowTypeDLQDomainID,
request.SourceClusterName,
rowTypeDLQRunID,
)

result := make(map[string]interface{})
if err := query.MapScan(result); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Error: %v", err),
}
}
queueSize := result["count"].(int64)
return &p.GetReplicationDLQSizeResponse{
Size: queueSize,
}, nil
}

func (d *cassandraPersistence) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
11 changes: 11 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,11 @@ type (
GetReplicationTasksRequest
}

// GetReplicationDLQSizeRequest is used to get one replication task from dlq
GetReplicationDLQSizeRequest struct {
SourceClusterName string
}

// DeleteReplicationTaskFromDLQRequest is used to delete replication task from DLQ
DeleteReplicationTaskFromDLQRequest struct {
SourceClusterName string
Expand All @@ -1038,6 +1043,11 @@ type (
// GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse

// GetReplicationDLQSizeResponse is the response for GetReplicationDLQSize
GetReplicationDLQSizeResponse struct {
Size int64
}

// RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue
RangeCompleteTimerTaskRequest struct {
InclusiveBeginTimestamp time.Time
Expand Down Expand Up @@ -1523,6 +1533,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,12 @@ func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
return m.persistence.GetReplicationTasksFromDLQ(request)
}

func (m *executionManagerImpl) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
return m.persistence.GetReplicationDLQSize(request)
}

func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5276,6 +5276,9 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() {
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
s.NoError(err)
s.Len(resp.Tasks, 2)
sizeResp, err := s.GetReplicationDLQSize(sourceCluster)
s.NoError(err)
s.Equal(int64(2), sizeResp.Size)
err = s.RangeDeleteReplicationTaskFromDLQ(sourceCluster, 0, 2)
s.NoError(err)
resp, err = s.GetReplicationTasksFromDLQ(sourceCluster, 0, 2, 2, nil)
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,16 @@ func (s *TestBase) GetReplicationTasksFromDLQ(
})
}

// GetReplicationDLQSize is a utility method to read replication dlq size
func (s *TestBase) GetReplicationDLQSize(
sourceCluster string,
) (*p.GetReplicationDLQSizeResponse, error) {

return s.ExecutionManager.GetReplicationDLQSize(&p.GetReplicationDLQSizeRequest{
SourceClusterName: sourceCluster,
})
}

// DeleteReplicationTaskFromDLQ is a utility method to delete a replication task info
func (s *TestBase) DeleteReplicationTaskFromDLQ(
sourceCluster string,
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type (
RangeCompleteReplicationTask(request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,22 @@ func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ(
return response, err
}

func (p *workflowExecutionPersistenceClient) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationDLQSizeScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationDLQSizeScope, metrics.PersistenceLatency)
response, err := p.persistence.GetReplicationDLQSize(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetReplicationDLQSizeScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,16 @@ func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromD
return p.persistence.GetReplicationTasksFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationDLQSize(
request *GetReplicationDLQSizeRequest,
) (*GetReplicationDLQSizeResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

return p.persistence.GetReplicationDLQSize(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) DeleteReplicationTaskFromDLQ(
request *DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
25 changes: 25 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,31 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
}
}

func (m *sqlExecutionManager) GetReplicationDLQSize(
request *p.GetReplicationDLQSizeRequest,
) (*p.GetReplicationDLQSizeResponse, error) {

size, err := m.db.SelectFromReplicationDLQ(&sqlplugin.ReplicationTaskDLQFilter{
SourceClusterName: request.SourceClusterName,
ShardID: m.shardID,
})

switch err {
case nil:
return &p.GetReplicationDLQSizeResponse{
Size: size,
}, nil
case sql.ErrNoRows:
return &p.GetReplicationDLQSizeResponse{
Size: 0,
}, nil
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationDLQSize operation failed. Select failed: %v", err),
}
}
}

func (m *sqlExecutionManager) DeleteReplicationTaskFromDLQ(
request *p.DeleteReplicationTaskFromDLQRequest,
) error {
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ type (
SourceClusterName string
}

// ReplicationTaskDLQFilter contains the column names within replication_tasks_dlq table that
// can be used to filter results through a WHERE clause
ReplicationTaskDLQFilter struct {
SourceClusterName string
ShardID int
}

// TimerTasksRow represents a row in timer_tasks table
TimerTasksRow struct {
ShardID int
Expand Down Expand Up @@ -607,6 +614,9 @@ type (
// SelectFromReplicationTasksDLQ returns one or more rows from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, minTaskID, pageSize}
SelectFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) ([]ReplicationTasksRow, error)
// SelectFromReplicationDLQ returns one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName}
SelectFromReplicationDLQ(filter *ReplicationTaskDLQFilter) (int64, error)
// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
// Required filter params - {sourceClusterName, shardID, taskID}
DeleteMessageFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) (sql.Result, error)
Expand Down
17 changes: 17 additions & 0 deletions common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ task_id > ? AND
task_id <= ?
ORDER BY task_id LIMIT ?`

getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
source_cluster_name = ? AND
shard_id = ?`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)`
Expand Down Expand Up @@ -351,6 +355,19 @@ func (mdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
func (mdb *db) SelectFromReplicationDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
var size []int64
if err := mdb.conn.Select(
&size, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID,
); err != nil {
return 0, err
}
return size[0], nil
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (mdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
16 changes: 16 additions & 0 deletions common/persistence/sql/sqlplugin/postgres/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ shard_id = $2 AND
task_id > $3 AND
task_id <= $4
ORDER BY task_id LIMIT $5`
getReplicationTaskDLQQuery = `SELECT count(1) as count FROM replication_tasks_dlq WHERE
source_cluster_name = $1 AND
shard_id = $2`

bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding`
createBufferedEventsQuery = `INSERT INTO buffered_events(` + bufferedEventsColumns + `)
Expand Down Expand Up @@ -350,6 +353,19 @@ func (pdb *db) SelectFromReplicationTasksDLQ(filter *sqlplugin.ReplicationTasksD
return rows, err
}

// SelectFromReplicationDLQ reads one row from replication_tasks_dlq table
func (pdb *db) SelectFromReplicationDLQ(filter *sqlplugin.ReplicationTaskDLQFilter) (int64, error) {
var size []int64
if err := pdb.conn.Select(
&size, getReplicationTaskDLQQuery,
filter.SourceClusterName,
filter.ShardID,
); err != nil {
return 0, err
}
return size[0], nil
}

// DeleteMessageFromReplicationTasksDLQ deletes one row from replication_tasks_dlq table
func (pdb *db) DeleteMessageFromReplicationTasksDLQ(
filter *sqlplugin.ReplicationTasksDLQFilter,
Expand Down
Loading