Skip to content

Commit

Permalink
DB scan admin command retry db operations (cadence-workflow#3184)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored and mkolodezny committed Apr 16, 2020
1 parent 525d1db commit c7caabd
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 45 deletions.
1 change: 1 addition & 0 deletions tools/cli/adminCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func connectToCassandra(c *cli.Context) *gocql.Session {
clusterCfg.SerialConsistency = gocql.LocalSerial
clusterCfg.NumConns = 20
clusterCfg.PoolConfig.HostSelectionPolicy = nil
clusterCfg.Consistency = gocql.LocalQuorum

session, err := clusterCfg.CreateSession()
if err != nil {
Expand Down
66 changes: 51 additions & 15 deletions tools/cli/adminDBCleanCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"os"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"

"github.com/gocql/gocql"
"github.com/urfave/cli"

Expand Down Expand Up @@ -205,26 +208,23 @@ func cleanShard(
WorkflowID: ce.WorkflowID,
RunID: ce.RunID,
}
preconditionForDBCall(&report.TotalDBRequests, limiter)
err = execStore.DeleteWorkflowExecution(deleteConcreteReq)
err = retryDeleteWorkflowExecution(limiter, &report.TotalDBRequests, execStore, deleteConcreteReq)
if err != nil {
report.Handled.FailedCleanedCount++
failedCleanWriter.Add(&ce)
continue
}
report.Handled.SuccessfullyCleanedCount++
successfullyCleanWriter.Add(&ce)
if ce.CorruptedExceptionMetadata.CorruptionType != OpenExecutionInvalidCurrentExecution {
deleteCurrentReq := &persistence.DeleteCurrentWorkflowExecutionRequest{
DomainID: ce.DomainID,
WorkflowID: ce.WorkflowID,
RunID: ce.RunID,
}
// deleting current execution is best effort, the success or failure of the cleanup
// is determined above based on if the concrete execution could be deleted
preconditionForDBCall(&report.TotalDBRequests, limiter)
execStore.DeleteCurrentWorkflowExecution(deleteCurrentReq)
deleteCurrentReq := &persistence.DeleteCurrentWorkflowExecutionRequest{
DomainID: ce.DomainID,
WorkflowID: ce.WorkflowID,
RunID: ce.RunID,
}
// deleting current execution is best effort, the success or failure of the cleanup
// is determined above based on if the concrete execution could be deleted
retryDeleteCurrentWorkflowExecution(limiter, &report.TotalDBRequests, execStore, deleteCurrentReq)

// TODO: we will want to also cleanup history for corrupted workflows, this will be punted on until this is converted to a workflow
}
return report
Expand Down Expand Up @@ -289,13 +289,13 @@ func createCleanOutputDirectories() *CleanOutputDirectories {
SuccessfullyCleanedDirectoryPath: fmt.Sprintf("./clean_%v/successfully_cleaned", now),
FailedCleanedDirectoryPath: fmt.Sprintf("./clean_%v/failed_cleaned", now),
}
if err := os.MkdirAll(cod.ShardCleanReportDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(cod.ShardCleanReportDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create ShardCleanReportDirectoryPath", err)
}
if err := os.MkdirAll(cod.SuccessfullyCleanedDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(cod.SuccessfullyCleanedDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create SuccessfullyCleanedDirectoryPath", err)
}
if err := os.MkdirAll(cod.FailedCleanedDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(cod.FailedCleanedDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create FailedCleanedDirectoryPath", err)
}
fmt.Println("clean results located under: ", fmt.Sprintf("./clean_%v", now))
Expand All @@ -309,3 +309,39 @@ func recordShardCleanReport(file *os.File, sdr *ShardCleanReport) {
}
writeToFile(file, string(data))
}

func retryDeleteWorkflowExecution(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
req *persistence.DeleteWorkflowExecutionRequest,
) error {
op := func() error {
preconditionForDBCall(totalDBRequests, limiter)
return execStore.DeleteWorkflowExecution(req)
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return err
}
return nil
}

func retryDeleteCurrentWorkflowExecution(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
req *persistence.DeleteCurrentWorkflowExecutionRequest,
) error {
op := func() error {
preconditionForDBCall(totalDBRequests, limiter)
return execStore.DeleteCurrentWorkflowExecution(req)
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return err
}
return nil
}
165 changes: 135 additions & 30 deletions tools/cli/adminDBScanCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -69,6 +70,11 @@ const (

const (
historyPageSize = 1
maxDBRetries = 10
)

var (
persistenceOperationRetryPolicy = common.CreatePersistanceRetryPolicy()
)

type (
Expand Down Expand Up @@ -145,7 +151,8 @@ type (
TotalExecutionsCount int64
CorruptedExecutionsCount int64
ExecutionCheckFailureCount int64
NumberOfShardScanFailures int64
NumberOfShardScanFailures int
ShardsFailed []int
PercentageCorrupted float64
PercentageCheckFailure float64
Rates Rates
Expand Down Expand Up @@ -277,8 +284,7 @@ func scanShard(
PageSize: executionsPageSize,
PageToken: token,
}
preconditionForDBCall(&report.TotalDBRequests, limiter)
resp, err := execStore.ListConcreteExecutions(req)
resp, err := retryListConcreteExecutions(limiter, &report.TotalDBRequests, execStore, req)
if err != nil {
report.Failure = &ShardScanReportFailure{
Note: "failed to call ListConcreteExecutions",
Expand Down Expand Up @@ -395,8 +401,7 @@ func verifyHistoryExists(
ShardID: shardID,
PageSize: historyPageSize,
}
preconditionForDBCall(totalDBRequests, limiter)
history, err := historyStore.ReadHistoryBranch(readHistoryBranchReq)
history, err := retryReadHistoryBranch(limiter, totalDBRequests, historyStore, readHistoryBranchReq)

ecf, stillExists := concreteExecutionStillExists(execution, shardID, execStore, limiter, totalDBRequests)
if ecf != nil {
Expand Down Expand Up @@ -530,8 +535,7 @@ func verifyCurrentExecution(
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
}
preconditionForDBCall(totalDBRequests, limiter)
currentExecution, err := execStore.GetCurrentExecution(getCurrentExecutionRequest)
currentExecution, err := retryGetCurrentExecution(limiter, totalDBRequests, execStore, getCurrentExecutionRequest)

ecf, stillOpen := concreteExecutionStillOpen(execution, shardID, execStore, limiter, totalDBRequests)
if ecf != nil {
Expand Down Expand Up @@ -606,8 +610,7 @@ func concreteExecutionStillExists(
RunId: &execution.RunID,
},
}
preconditionForDBCall(totalDBRequests, limiter)
_, err := execStore.GetWorkflowExecution(getConcreteExecution)
_, err := retryGetWorkflowExecution(limiter, totalDBRequests, execStore, getConcreteExecution)
if err == nil {
return nil, true
}
Expand Down Expand Up @@ -641,17 +644,22 @@ func concreteExecutionStillOpen(
RunId: &execution.RunID,
},
}
preconditionForDBCall(totalDBRequests, limiter)
ce, err := execStore.GetWorkflowExecution(getConcreteExecution)
ce, err := retryGetWorkflowExecution(limiter, totalDBRequests, execStore, getConcreteExecution)

if err != nil {
return &ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
Note: "failed to access concrete execution to verify it is still open",
Details: err.Error(),
}, false
switch err.(type) {
case *shared.EntityNotExistsError:
return nil, false
default:
return &ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
Note: "failed to access concrete execution to verify it is still open",
Details: err.Error(),
}, false
}
}

return nil, executionOpen(ce.State.ExecutionInfo)
Expand Down Expand Up @@ -706,13 +714,13 @@ func createScanOutputDirectories() *ScanOutputDirectories {
ExecutionCheckFailureDirectoryPath: fmt.Sprintf("./scan_%v/execution_check_failure", now),
CorruptedExecutionDirectoryPath: fmt.Sprintf("./scan_%v/corrupted_execution", now),
}
if err := os.MkdirAll(sod.ShardScanReportDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(sod.ShardScanReportDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create ShardScanFailureDirectoryPath", err)
}
if err := os.MkdirAll(sod.ExecutionCheckFailureDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(sod.ExecutionCheckFailureDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create ExecutionCheckFailureDirectoryPath", err)
}
if err := os.MkdirAll(sod.CorruptedExecutionDirectoryPath, 0766); err != nil {
if err := os.MkdirAll(sod.CorruptedExecutionDirectoryPath, 0777); err != nil {
ErrorAndExit("failed to create CorruptedExecutionDirectoryPath", err)
}
fmt.Println("scan results located under: ", fmt.Sprintf("./scan_%v", now))
Expand All @@ -739,6 +747,7 @@ func includeShardInProgressReport(report *ShardScanReport, progressReport *Progr
progressReport.Rates.TimeRunning = time.Now().Sub(startTime).String()
if report.Failure != nil {
progressReport.NumberOfShardScanFailures++
progressReport.ShardsFailed = append(progressReport.ShardsFailed, report.ShardID)
}
if report.Scanned != nil {
progressReport.CorruptedExecutionsCount += report.Scanned.CorruptedExecutionsCount
Expand All @@ -747,23 +756,29 @@ func includeShardInProgressReport(report *ShardScanReport, progressReport *Progr
progressReport.CorruptionTypeBreakdown.TotalHistoryMissing += report.Scanned.CorruptionTypeBreakdown.TotalHistoryMissing
progressReport.CorruptionTypeBreakdown.TotalOpenExecutionInvalidCurrentExecution += report.Scanned.CorruptionTypeBreakdown.TotalOpenExecutionInvalidCurrentExecution
progressReport.CorruptionTypeBreakdown.TotalInvalidFirstEvent += report.Scanned.CorruptionTypeBreakdown.TotalInvalidFirstEvent
if progressReport.ShardExecutionCountsDistribution.MinExecutions == nil ||
*progressReport.ShardExecutionCountsDistribution.MinExecutions > report.Scanned.TotalExecutionsCount {
if report.Failure == nil && (progressReport.ShardExecutionCountsDistribution.MinExecutions == nil ||
*progressReport.ShardExecutionCountsDistribution.MinExecutions > report.Scanned.TotalExecutionsCount) {
progressReport.ShardExecutionCountsDistribution.MinExecutions = &report.Scanned.TotalExecutionsCount
}
if progressReport.ShardExecutionCountsDistribution.MaxExecutions == nil ||
*progressReport.ShardExecutionCountsDistribution.MaxExecutions < report.Scanned.TotalExecutionsCount {
if report.Failure == nil && (progressReport.ShardExecutionCountsDistribution.MaxExecutions == nil ||
*progressReport.ShardExecutionCountsDistribution.MaxExecutions < report.Scanned.TotalExecutionsCount) {
progressReport.ShardExecutionCountsDistribution.MaxExecutions = &report.Scanned.TotalExecutionsCount
}
progressReport.ShardExecutionCountsDistribution.AverageExecutions = progressReport.TotalExecutionsCount / int64(progressReport.NumberOfShardsFinished)
successfullyFinishedShards := progressReport.NumberOfShardsFinished - progressReport.NumberOfShardScanFailures
if successfullyFinishedShards > 0 {
progressReport.ShardExecutionCountsDistribution.AverageExecutions = progressReport.TotalExecutionsCount / int64(successfullyFinishedShards)
}
}

if progressReport.TotalExecutionsCount > 0 {
progressReport.PercentageCorrupted = math.Round((float64(progressReport.CorruptedExecutionsCount) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.PercentageCheckFailure = math.Round((float64(progressReport.ExecutionCheckFailureCount) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.CorruptionTypeBreakdown.PercentageHistoryMissing = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalHistoryMissing) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.CorruptionTypeBreakdown.PercentageInvalidStartEvent = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalInvalidFirstEvent) * 100.0) / float64(progressReport.TotalExecutionsCount))
progressReport.CorruptionTypeBreakdown.PercentageOpenExecutionInvalidCurrentExecution = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalOpenExecutionInvalidCurrentExecution) * 100.0) / float64(progressReport.TotalExecutionsCount))
}

if progressReport.CorruptedExecutionsCount > 0 {
progressReport.CorruptionTypeBreakdown.PercentageHistoryMissing = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalHistoryMissing) * 100.0) / float64(progressReport.CorruptedExecutionsCount))
progressReport.CorruptionTypeBreakdown.PercentageInvalidStartEvent = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalInvalidFirstEvent) * 100.0) / float64(progressReport.CorruptedExecutionsCount))
progressReport.CorruptionTypeBreakdown.PercentageOpenExecutionInvalidCurrentExecution = math.Round((float64(progressReport.CorruptionTypeBreakdown.TotalOpenExecutionInvalidCurrentExecution) * 100.0) / float64(progressReport.CorruptedExecutionsCount))
}

pastTime := time.Now().Sub(startTime)
Expand Down Expand Up @@ -802,3 +817,93 @@ func preconditionForDBCall(totalDBRequests *int64, limiter *quotas.DynamicRateLi
func executionOpen(execution *persistence.InternalWorkflowExecutionInfo) bool {
return execution.State == persistence.WorkflowStateCreated || execution.State == persistence.WorkflowStateRunning
}

func retryListConcreteExecutions(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
req *persistence.ListConcreteExecutionsRequest,
) (*persistence.InternalListConcreteExecutionsResponse, error) {
var resp *persistence.InternalListConcreteExecutionsResponse
op := func() error {
var err error
preconditionForDBCall(totalDBRequests, limiter)
resp, err = execStore.ListConcreteExecutions(req)
return err
}

var err error
// only add this extra layer of retries for ListConcreteExecutions because a failure
// here will cause a scan over a full shard to stop while a failure on any other db will just
// result in one failed execution check
for i := 0; i < maxDBRetries; i++ {
err = backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err == nil {
return resp, nil
}
}
return nil, err
}

func retryGetWorkflowExecution(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
req *persistence.GetWorkflowExecutionRequest,
) (*persistence.InternalGetWorkflowExecutionResponse, error) {
var resp *persistence.InternalGetWorkflowExecutionResponse
op := func() error {
var err error
preconditionForDBCall(totalDBRequests, limiter)
resp, err = execStore.GetWorkflowExecution(req)
return err
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return nil, err
}
return resp, nil
}

func retryGetCurrentExecution(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
req *persistence.GetCurrentExecutionRequest,
) (*persistence.GetCurrentExecutionResponse, error) {
var resp *persistence.GetCurrentExecutionResponse
op := func() error {
var err error
preconditionForDBCall(totalDBRequests, limiter)
resp, err = execStore.GetCurrentExecution(req)
return err
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return nil, err
}
return resp, nil
}

func retryReadHistoryBranch(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
historyStore persistence.HistoryStore,
req *persistence.InternalReadHistoryBranchRequest,
) (*persistence.InternalReadHistoryBranchResponse, error) {
var resp *persistence.InternalReadHistoryBranchResponse
op := func() error {
var err error
preconditionForDBCall(totalDBRequests, limiter)
resp, err = historyStore.ReadHistoryBranch(req)
return err
}

err := backoff.Retry(op, persistenceOperationRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return nil, err
}
return resp, nil
}

0 comments on commit c7caabd

Please sign in to comment.