Skip to content

Commit

Permalink
Use version history to get branch token as fall back (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored and mkolodezny committed Apr 16, 2020
1 parent c7caabd commit 13265ee
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 74 deletions.
15 changes: 6 additions & 9 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ workflow_state = ? ` +
`and visibility_ts = ? ` +
`and task_id = ?`

templateListWorkflowExecutionQuery = `SELECT run_id, execution ` +
templateListWorkflowExecutionQuery = `SELECT run_id, execution, version_histories, version_histories_encoding ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ?`
Expand Down Expand Up @@ -1311,10 +1311,7 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *p.GetWorkflowExecut
replicationState := createReplicationState(result["replication_state"].(map[string]interface{}))
state.ReplicationState = replicationState

state.VersionHistories = p.NewDataBlob(
result["version_histories"].([]byte),
common.EncodingType(result["version_histories_encoding"].(string)),
)
state.VersionHistories = p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string)))

if state.VersionHistories != nil && state.ReplicationState != nil {
return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -2062,10 +2059,10 @@ func (d *cassandraPersistence) ListConcreteExecutions(
result = make(map[string]interface{})
continue
}
if _, ok := result["execution"]; ok {
wfInfo := createWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
response.ExecutionInfos = append(response.ExecutionInfos, wfInfo)
}
response.Executions = append(response.Executions, &p.InternalListConcreteExecutionsEntity{
ExecutionInfo: createWorkflowExecutionInfo(result["execution"].(map[string]interface{})),
VersionHistories: p.NewDataBlob(result["version_histories"].([]byte), common.EncodingType(result["version_histories_encoding"].(string))),
})
result = make(map[string]interface{})
}
nextPageToken := iter.PageState()
Expand Down
10 changes: 8 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,8 +792,14 @@ type (

// ListConcreteExecutionsResponse is response to ListConcreteExecutions
ListConcreteExecutionsResponse struct {
ExecutionInfos []*WorkflowExecutionInfo
PageToken []byte
Executions []*ListConcreteExecutionsEntity
PageToken []byte
}

// ListConcreteExecutionsEntity is a single entity in ListConcreteExecutionsResponse
ListConcreteExecutionsEntity struct {
ExecutionInfo *WorkflowExecutionInfo
VersionHistories *VersionHistories
}

// GetCurrentExecutionResponse is the response to GetCurrentExecution
Expand Down
16 changes: 12 additions & 4 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,14 +786,22 @@ func (m *executionManagerImpl) ListConcreteExecutions(
return nil, err
}
newResponse := &ListConcreteExecutionsResponse{
ExecutionInfos: make([]*WorkflowExecutionInfo, len(response.ExecutionInfos), len(response.ExecutionInfos)),
PageToken: response.NextPageToken,
Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions), len(response.Executions)),
PageToken: response.NextPageToken,
}
for i, info := range response.ExecutionInfos {
newResponse.ExecutionInfos[i], _, err = m.DeserializeExecutionInfo(info)
for i, e := range response.Executions {
info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo)
if err != nil {
return nil, err
}
vh, err := m.DeserializeVersionHistories(e.VersionHistories)
if err != nil {
return nil, err
}
newResponse.Executions[i] = &ListConcreteExecutionsEntity{
ExecutionInfo: info,
VersionHistories: vh,
}
}
return newResponse, nil
}
Expand Down
10 changes: 8 additions & 2 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,14 @@ type (

// InternalListConcreteExecutionsResponse is the response to ListConcreteExecutions for Persistence Interface
InternalListConcreteExecutionsResponse struct {
ExecutionInfos []*InternalWorkflowExecutionInfo
NextPageToken []byte
Executions []*InternalListConcreteExecutionsEntity
NextPageToken []byte
}

// InternalListConcreteExecutionsEntity is a single entity in InternalListConcreteExecutionsResponse
InternalListConcreteExecutionsEntity struct {
ExecutionInfo *InternalWorkflowExecutionInfo
VersionHistories *DataBlob
}

// InternalForkHistoryBranchRequest is used to fork a history branch
Expand Down
142 changes: 85 additions & 57 deletions tools/cli/adminDBScanCommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package cli
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"os"
Expand Down Expand Up @@ -293,7 +294,10 @@ func scanShard(
return report
}
token = resp.NextPageToken
for _, e := range resp.ExecutionInfos {
for _, e := range resp.Executions {
if e == nil || e.ExecutionInfo == nil {
continue
}
if report.Scanned == nil {
report.Scanned = &ShardScanReportExecutionsScanned{}
}
Expand All @@ -307,7 +311,8 @@ func scanShard(
limiter,
historyStore,
&report.TotalDBRequests,
execStore)
execStore,
payloadSerializer)
switch historyVerificationResult {
case VerificationResultNoCorruption:
// nothing to do just keep checking other conditions
Expand Down Expand Up @@ -370,7 +375,7 @@ func scanShard(
}

func verifyHistoryExists(
execution *persistence.InternalWorkflowExecutionInfo,
execution *persistence.InternalListConcreteExecutionsEntity,
branchDecoder *codec.ThriftRWEncoder,
corruptedExecutionWriter BufferedWriter,
checkFailureWriter BufferedWriter,
Expand All @@ -379,16 +384,16 @@ func verifyHistoryExists(
historyStore persistence.HistoryStore,
totalDBRequests *int64,
execStore persistence.ExecutionStore,
payloadSerializer persistence.PayloadSerializer,
) (VerificationResult, *persistence.InternalReadHistoryBranchResponse, *shared.HistoryBranch) {
var branch shared.HistoryBranch
err := branchDecoder.Decode(execution.BranchToken, &branch)
branch, err := getHistoryBranch(execution, payloadSerializer, branchDecoder)
if err != nil {
checkFailureWriter.Add(&ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
Note: "failed to decode branch token",
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
Note: "failed to get history branch",
Details: err.Error(),
})
return VerificationResultCheckFailure, nil, nil
Expand All @@ -403,7 +408,7 @@ func verifyHistoryExists(
}
history, err := retryReadHistoryBranch(limiter, totalDBRequests, historyStore, readHistoryBranchReq)

ecf, stillExists := concreteExecutionStillExists(execution, shardID, execStore, limiter, totalDBRequests)
ecf, stillExists := concreteExecutionStillExists(execution.ExecutionInfo, shardID, execStore, limiter, totalDBRequests)
if ecf != nil {
checkFailureWriter.Add(ecf)
return VerificationResultCheckFailure, nil, nil
Expand All @@ -416,13 +421,13 @@ func verifyHistoryExists(
if err == gocql.ErrNotFound {
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: HistoryMissing,
Note: "detected history missing based on gocql.ErrNotFound",
Expand All @@ -433,35 +438,35 @@ func verifyHistoryExists(
}
checkFailureWriter.Add(&ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
Note: "failed to read history branch with error other than gocql.ErrNotFond",
Details: err.Error(),
})
return VerificationResultCheckFailure, nil, nil
} else if history == nil || len(history.History) == 0 {
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: HistoryMissing,
Note: "got empty history",
},
})
return VerificationResultDetectedCorruption, nil, nil
}
return VerificationResultNoCorruption, history, &branch
return VerificationResultNoCorruption, history, branch
}

func verifyFirstHistoryEvent(
execution *persistence.InternalWorkflowExecutionInfo,
execution *persistence.InternalListConcreteExecutionsEntity,
branch *shared.HistoryBranch,
corruptedExecutionWriter BufferedWriter,
checkFailureWriter BufferedWriter,
Expand All @@ -473,23 +478,23 @@ func verifyFirstHistoryEvent(
if err != nil || len(firstBatch) == 0 {
checkFailureWriter.Add(&ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
Note: "failed to deserialize batch events",
Details: err.Error(),
})
return VerificationResultCheckFailure
} else if firstBatch[0].GetEventId() != common.FirstEventID {
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: InvalidFirstEvent,
Note: "got unexpected first eventID",
Expand All @@ -500,13 +505,13 @@ func verifyFirstHistoryEvent(
} else if firstBatch[0].GetEventType() != shared.EventTypeWorkflowExecutionStarted {
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: InvalidFirstEvent,
Note: "got unexpected first eventType",
Expand All @@ -519,7 +524,7 @@ func verifyFirstHistoryEvent(
}

func verifyCurrentExecution(
execution *persistence.InternalWorkflowExecutionInfo,
execution *persistence.InternalListConcreteExecutionsEntity,
corruptedExecutionWriter BufferedWriter,
checkFailureWriter BufferedWriter,
shardID int,
Expand All @@ -528,16 +533,16 @@ func verifyCurrentExecution(
limiter *quotas.DynamicRateLimiter,
totalDBRequests *int64,
) VerificationResult {
if !executionOpen(execution) {
if !executionOpen(execution.ExecutionInfo) {
return VerificationResultNoCorruption
}
getCurrentExecutionRequest := &persistence.GetCurrentExecutionRequest{
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
}
currentExecution, err := retryGetCurrentExecution(limiter, totalDBRequests, execStore, getCurrentExecutionRequest)

ecf, stillOpen := concreteExecutionStillOpen(execution, shardID, execStore, limiter, totalDBRequests)
ecf, stillOpen := concreteExecutionStillOpen(execution.ExecutionInfo, shardID, execStore, limiter, totalDBRequests)
if ecf != nil {
checkFailureWriter.Add(ecf)
return VerificationResultCheckFailure
Expand All @@ -551,13 +556,13 @@ func verifyCurrentExecution(
case *shared.EntityNotExistsError:
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: OpenExecutionInvalidCurrentExecution,
Note: "execution is open without having a current execution",
Expand All @@ -568,24 +573,24 @@ func verifyCurrentExecution(
default:
checkFailureWriter.Add(&ExecutionCheckFailure{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
Note: "failed to access current execution but could not confirm that it does not exist",
Details: err.Error(),
})
return VerificationResultCheckFailure
}
} else if currentExecution.RunID != execution.RunID {
} else if currentExecution.RunID != execution.ExecutionInfo.RunID {
corruptedExecutionWriter.Add(&CorruptedExecution{
ShardID: shardID,
DomainID: execution.DomainID,
WorkflowID: execution.WorkflowID,
RunID: execution.RunID,
NextEventID: execution.NextEventID,
DomainID: execution.ExecutionInfo.DomainID,
WorkflowID: execution.ExecutionInfo.WorkflowID,
RunID: execution.ExecutionInfo.RunID,
NextEventID: execution.ExecutionInfo.NextEventID,
TreeID: branch.GetTreeID(),
BranchID: branch.GetBranchID(),
CloseStatus: execution.CloseStatus,
CloseStatus: execution.ExecutionInfo.CloseStatus,
CorruptedExceptionMetadata: CorruptedExceptionMetadata{
CorruptionType: OpenExecutionInvalidCurrentExecution,
Note: "found open execution for which there exists current execution pointing at a different concrete execution",
Expand Down Expand Up @@ -907,3 +912,26 @@ func retryReadHistoryBranch(
}
return resp, nil
}

func getHistoryBranch(
e *persistence.InternalListConcreteExecutionsEntity,
payloadSerializer persistence.PayloadSerializer,
branchDecoder *codec.ThriftRWEncoder,
) (*shared.HistoryBranch, error) {
branchTokenBytes := e.ExecutionInfo.BranchToken
if len(branchTokenBytes) == 0 {
if e.VersionHistories == nil {
return nil, errors.New("failed to get branch token")
}
vh, err := payloadSerializer.DeserializeVersionHistories(e.VersionHistories)
if err != nil {
return nil, err
}
branchTokenBytes = vh.GetHistories()[vh.GetCurrentVersionHistoryIndex()].GetBranchToken()
}
var branch shared.HistoryBranch
if err := branchDecoder.Decode(branchTokenBytes, &branch); err != nil {
return nil, err
}
return &branch, nil
}

0 comments on commit 13265ee

Please sign in to comment.