Skip to content

Commit

Permalink
Fix NDC resetter persistence bugs (#3500)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Sep 11, 2020
1 parent 2fc957e commit 3b56ad7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
14 changes: 6 additions & 8 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,6 @@ func (m *sqlExecutionManager) createWorkflowExecutionTx(
return nil, err
}

switch request.Mode {
case p.CreateWorkflowModeContinueAsNew:
// cannot create workflow with continue as new mode
return nil, &workflow.InternalServiceError{
Message: "CreateWorkflowExecution: operation failed, encounter invalid CreateWorkflowModeContinueAsNew",
}
}

var err error
var row *sqlplugin.CurrentExecutionsRow
if row, err = lockCurrentExecutionIfExists(tx, m.shardID, domainID, workflowID); err != nil {
Expand Down Expand Up @@ -179,6 +171,12 @@ func (m *sqlExecutionManager) createWorkflowExecutionTx(
return nil, err
}

case p.CreateWorkflowModeContinueAsNew:
// continueAsNew mode expects a current run exists
if err := assertRunIDMismatch(sqlplugin.MustParseUUID(executionInfo.RunID), row.RunID); err != nil {
return nil, err
}

default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf(
Expand Down
11 changes: 8 additions & 3 deletions service/history/reset/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ func (r *workflowResetterImpl) persistToDB(
if err != nil {
return err
}
resetHistorySize, err := resetWorkflow.GetContext().PersistFirstWorkflowEvents(resetWorkflowEventsSeq[0])
if len(resetWorkflowEventsSeq) != 1{
return &shared.InternalServiceError{
Message: "there should be EXACTLY one batch of events for reset",
}
}
resetHistorySize, err := resetWorkflow.GetContext().PersistNonFirstWorkflowEvents(resetWorkflowEventsSeq[0])
if err != nil {
return err
}
Expand All @@ -312,7 +317,7 @@ func (r *workflowResetterImpl) replayResetWorkflow(
resetRequestID string,
) (execution.Workflow, error) {

resetBranchToken, err := r.generateBranchToken(
resetBranchToken, err := r.forkAndGenerateBranchToken(
domainID,
workflowID,
baseBranchToken,
Expand Down Expand Up @@ -399,7 +404,7 @@ func (r *workflowResetterImpl) failInflightActivity(
return nil
}

func (r *workflowResetterImpl) generateBranchToken(
func (r *workflowResetterImpl) forkAndGenerateBranchToken(
domainID string,
workflowID string,
forkBranchToken []byte,
Expand Down
4 changes: 2 additions & 2 deletions service/history/reset/resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (s *workflowResetterSuite) TestPersistToDB_CurrentNotTerminated() {
gomock.Any(),
execution.TransactionPolicyActive,
).Return(resetSnapshot, resetEventsSeq, nil).Times(1)
resetContext.EXPECT().PersistFirstWorkflowEvents(resetEventsSeq[0]).Return(resetEventsSize, nil).Times(1)
resetContext.EXPECT().PersistNonFirstWorkflowEvents(resetEventsSeq[0]).Return(resetEventsSize, nil).Times(1)
resetContext.EXPECT().CreateWorkflowExecution(
resetSnapshot,
resetEventsSize,
Expand Down Expand Up @@ -312,7 +312,7 @@ func (s *workflowResetterSuite) TestGenerateBranchToken() {
ShardID: common.IntPtr(s.mockShard.GetShardID()),
}).Return(&persistence.ForkHistoryBranchResponse{NewBranchToken: resetBranchToken}, nil).Times(1)

newBranchToken, err := s.workflowResetter.generateBranchToken(
newBranchToken, err := s.workflowResetter.forkAndGenerateBranchToken(
s.domainID, s.workflowID, baseBranchToken, baseNodeID, s.resetRunID,
)
s.NoError(err)
Expand Down

0 comments on commit 3b56ad7

Please sign in to comment.