diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index c106debe337..8dab111a17b 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -240,18 +240,19 @@ const ( `IF range_id = ?` templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` + - `SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?}` + + `SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + `and workflow_id = ? ` + `and run_id = ? ` + `and visibility_ts = ? ` + - `and task_id = ? ` + `and task_id = ? ` + + `IF current_run_id = ? ` templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` + `shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution) ` + - `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?}) IF NOT EXISTS USING TTL 0 ` + `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 ` templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` + `shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` + @@ -286,7 +287,7 @@ const ( `and visibility_ts = ? ` + `and task_id = ?` - templateGetCurrentExecutionQuery = `SELECT current_run_id ` + + templateGetCurrentExecutionQuery = `SELECT current_run_id, execution ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -792,12 +793,15 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx if execution, ok := previous["execution"].(map[string]interface{}); ok { // CreateWorkflowExecution failed because it already exists + executionInfo := createWorkflowExecutionInfo(execution) msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)", - execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")) - return nil, &workflow.WorkflowExecutionAlreadyStartedError{ - Message: common.StringPtr(msg), - StartRequestId: common.StringPtr(fmt.Sprintf("%v", execution["create_request_id"])), - RunId: common.StringPtr(fmt.Sprintf("%v", execution["run_id"])), + request.Execution.GetWorkflowId(), executionInfo.RunID, request.RangeID, strings.Join(columns, ",")) + return nil, &WorkflowExecutionAlreadyStartedError{ + Msg: msg, + StartRequestID: executionInfo.CreateRequestID, + RunID: executionInfo.RunID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, } } } @@ -827,11 +831,28 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *CreateWorkflowExecutionRequest, batch *gocql.Batch, cqlNowTimestamp int64) { + + parentDomainID := emptyDomainID + parentWorkflowID := "" + parentRunID := emptyRunID + initiatedID := emptyInitiatedID + state := WorkflowStateRunning + closeStatus := WorkflowCloseStatusNone + if request.ParentExecution != nil { + parentDomainID = request.ParentDomainID + parentWorkflowID = *request.ParentExecution.WorkflowId + parentRunID = *request.ParentExecution.RunId + initiatedID = request.InitiatedID + state = WorkflowStateCreated + } + if request.ContinueAsNew { batch.Query(templateUpdateCurrentWorkflowExecutionQuery, *request.Execution.RunId, *request.Execution.RunId, request.RequestID, + state, + closeStatus, d.shardID, rowTypeExecution, request.DomainID, @@ -839,6 +860,7 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat permanentRunID, defaultVisibilityTimestamp, rowTypeExecutionTaskID, + request.PreviousRunID, ) } else { batch.Query(templateCreateWorkflowExecutionQuery, @@ -852,20 +874,11 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat *request.Execution.RunId, *request.Execution.RunId, request.RequestID, + state, + closeStatus, ) } - parentDomainID := emptyDomainID - parentWorkflowID := "" - parentRunID := emptyRunID - initiatedID := emptyInitiatedID - if request.ParentExecution != nil { - parentDomainID = request.ParentDomainID - parentWorkflowID = *request.ParentExecution.WorkflowId - parentRunID = *request.ParentExecution.RunId - initiatedID = request.InitiatedID - } - batch.Query(templateCreateWorkflowExecutionQuery2, d.shardID, request.DomainID, @@ -1207,8 +1220,8 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR defaultVisibilityTimestamp, rowTypeExecutionTaskID) - var currentRunID string - if err := query.Scan(¤tRunID); err != nil { + result := make(map[string]interface{}) + if err := query.MapScan(result); err != nil { if err == gocql.ErrNotFound { return nil, &workflow.EntityNotExistsError{ Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v", @@ -1225,7 +1238,14 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR } } - return &GetCurrentExecutionResponse{RunID: currentRunID}, nil + currentRunID := result["current_run_id"].(gocql.UUID).String() + executionInfo := createWorkflowExecutionInfo(result["execution"].(map[string]interface{})) + return &GetCurrentExecutionResponse{ + RunID: currentRunID, + StartRequestID: executionInfo.CreateRequestID, + State: executionInfo.State, + CloseStatus: executionInfo.CloseStatus, + }, nil } func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) { diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index ffc75327bb6..69927df3f9c 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -80,9 +80,9 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() { task1, err1 := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType1", 20, 14, nil, 3, 0, 2, nil) s.NotNil(err1, "Expected workflow creation to fail.") log.Infof("Unable to start workflow execution: %v", err1) - startedErr, ok := err1.(*gen.WorkflowExecutionAlreadyStartedError) + startedErr, ok := err1.(*WorkflowExecutionAlreadyStartedError) s.True(ok) - s.Equal(workflowExecution.RunId, startedErr.RunId, startedErr.Message) + s.Equal(workflowExecution.GetRunId(), startedErr.RunID, startedErr.Msg) s.Empty(task1, "Expected empty task identifier.") response, err2 := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 26b07fb9604..0a92fa3b5d0 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -94,6 +94,15 @@ type ( Msg string } + // WorkflowExecutionAlreadyStartedError is returned when creating a new workflow failed. + WorkflowExecutionAlreadyStartedError struct { + Msg string + StartRequestID string + RunID string + State int + CloseStatus int + } + // TimeoutError is returned when a write operation fails due to a timeout TimeoutError struct { Msg string @@ -371,6 +380,8 @@ type ( DecisionStartedID int64 DecisionStartToCloseTimeout int32 ContinueAsNew bool + PreviousRunID string + ExecutionInfo *WorkflowExecutionInfo } // CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest @@ -397,8 +408,10 @@ type ( // GetCurrentExecutionResponse is the response to GetCurrentExecution GetCurrentExecutionResponse struct { - RunID string - ExecutionInfo *WorkflowExecutionInfo + StartRequestID string + RunID string + State int + CloseStatus int } // UpdateWorkflowExecutionRequest is used to update a workflow execution @@ -718,6 +731,10 @@ func (e *ShardOwnershipLostError) Error() string { return e.Msg } +func (e *WorkflowExecutionAlreadyStartedError) Error() string { + return e.Msg +} + func (e *TimeoutError) Error() string { return e.Msg } diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 808546f14fa..5f4fac6abcf 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -203,7 +203,7 @@ func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) er // CreateWorkflowExecution is a utility method to create workflow executions func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleID int64, timerTasks []Task) (string, error) { + decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) { response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ RequestID: uuid.New(), DomainID: domainID, @@ -230,17 +230,13 @@ func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution wo DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // CreateWorkflowExecutionManyTasks is a utility method to create workflow executions func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error) { + decisionScheduleIDs []int64, activityScheduleIDs []int64) (*CreateWorkflowExecutionResponse, error) { transferTasks := []Task{} for _, decisionScheduleID := range decisionScheduleIDs { @@ -278,18 +274,14 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExe DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // CreateChildWorkflowExecution is a utility method to create child workflow executions func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64, - decisionScheduleID int64, timerTasks []Task) (string, error) { + decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) { response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ RequestID: uuid.New(), DomainID: domainID, @@ -319,11 +311,7 @@ func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecuti DecisionStartToCloseTimeout: 1, }) - if err != nil { - return "", err - } - - return response.TaskID, nil + return response, err } // GetWorkflowExecutionInfo is a utility method to retrieve execution info @@ -393,6 +381,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co DecisionStartedID: common.EmptyEventID, DecisionStartToCloseTimeout: 1, ContinueAsNew: true, + PreviousRunID: updatedInfo.RunID, }, }) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index ff76114d288..d6e6284a700 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -166,60 +166,8 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow } execution := workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(*request.WorkflowId), - } - isBrandNewWorkflow := true - // without setting the run ID so we can get the current workflow. if any - context, release, err := e.historyCache.getOrCreateWorkflowExecution(domainID, execution) - execution.RunId = common.StringPtr(uuid.New()) - - errFn := func(errMsg string, ms *mutableStateBuilder) error { - msg := fmt.Sprintf(errMsg, ms.executionInfo.WorkflowID, ms.executionInfo.RunID) - return &workflow.WorkflowExecutionAlreadyStartedError{ - Message: common.StringPtr(msg), - StartRequestId: common.StringPtr(fmt.Sprintf("%v", ms.executionInfo.CreateRequestID)), - RunId: common.StringPtr(fmt.Sprintf("%v", ms.executionInfo.RunID)), - } - } - - if err != nil { - if _, ok := err.(*workflow.EntityNotExistsError); !ok { - return nil, err - } - // here we know that there is no existing run of this workflow ID, i.e. not running, no history. - } else { - isBrandNewWorkflow = false - defer release() - - msBuilder, err := context.loadWorkflowExecution() - if err != nil { - return nil, err - } - // here we know there is some information about the workflow, i.e. either running right now or has history - // check if the this workflow is finished - if msBuilder.isWorkflowExecutionRunning() { - // if client issue a duplicate request - if msBuilder.executionInfo.CreateRequestID == common.StringDefault(request.RequestId) { - return &workflow.StartWorkflowExecutionResponse{RunId: common.StringPtr(msBuilder.executionInfo.RunID)}, nil - } - msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v." - return nil, errFn(msg, msBuilder) - } - - switch startRequest.StartRequest.GetWorkflowIdReusePolicy() { - case workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly: - if _, ok := FailedWorkflowCloseState[msBuilder.executionInfo.CloseStatus]; !ok { - msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v." - return nil, errFn(msg, msBuilder) - } - case workflow.WorkflowIdReusePolicyAllowDuplicate: - // as long as workflow not running, so this case has no check - case workflow.WorkflowIdReusePolicyRejectDuplicate: - msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v." - return nil, errFn(msg, msBuilder) - default: - return nil, &workflow.InternalServiceError{Message: "Failed to process start workflow reuse policy."} - } + WorkflowId: request.WorkflowId, + RunId: common.StringPtr(uuid.New()), } var parentExecution *workflow.WorkflowExecution @@ -272,7 +220,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow return nil, serializedError } - err1 := e.shard.AppendHistoryEvents(&persistence.AppendHistoryEventsRequest{ + err = e.shard.AppendHistoryEvents(&persistence.AppendHistoryEventsRequest{ DomainID: domainID, Execution: execution, // It is ok to use 0 for TransactionID because RunID is unique so there are @@ -281,68 +229,124 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow FirstEventID: *startedEvent.EventId, Events: serializedHistory, }) - if err1 != nil { - return nil, err1 + if err != nil { + return nil, err } - _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ - RequestID: common.StringDefault(request.RequestId), - DomainID: domainID, - Execution: execution, - ParentDomainID: parentDomainID, - ParentExecution: parentExecution, - InitiatedID: initiatedID, - TaskList: *request.TaskList.Name, - WorkflowTypeName: *request.WorkflowType.Name, - WorkflowTimeout: *request.ExecutionStartToCloseTimeoutSeconds, - DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds, - ExecutionContext: nil, - NextEventID: msBuilder.GetNextEventID(), - LastProcessedEvent: emptyEventID, - TransferTasks: transferTasks, - DecisionScheduleID: decisionScheduleID, - DecisionStartedID: decisionStartID, - DecisionStartToCloseTimeout: decisionTimeout, - ContinueAsNew: !isBrandNewWorkflow, - TimerTasks: timerTasks, - }) + deleteEvents := func() { + // We created the history events but failed to create workflow execution, so cleanup the history which could cause + // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they + // are always created for a unique run_id which is not visible beyond this call yet. + // TODO: Handle error on deletion of execution history + e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ + DomainID: domainID, + Execution: execution, + }) + } - if err != nil { - switch t := err.(type) { - case *workflow.WorkflowExecutionAlreadyStartedError: - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: execution, - }) + createWorkflow := func(isBrandNew bool, prevRunID string) (string, error) { + _, err = e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ + RequestID: common.StringDefault(request.RequestId), + DomainID: domainID, + Execution: execution, + ParentDomainID: parentDomainID, + ParentExecution: parentExecution, + InitiatedID: initiatedID, + TaskList: *request.TaskList.Name, + WorkflowTypeName: *request.WorkflowType.Name, + WorkflowTimeout: *request.ExecutionStartToCloseTimeoutSeconds, + DecisionTimeoutValue: *request.TaskStartToCloseTimeoutSeconds, + ExecutionContext: nil, + NextEventID: msBuilder.GetNextEventID(), + LastProcessedEvent: emptyEventID, + TransferTasks: transferTasks, + DecisionScheduleID: decisionScheduleID, + DecisionStartedID: decisionStartID, + DecisionStartToCloseTimeout: decisionTimeout, + TimerTasks: timerTasks, + ContinueAsNew: !isBrandNew, + PreviousRunID: prevRunID, + }) - if common.StringDefault(t.StartRequestId) == common.StringDefault(request.RequestId) { - return &workflow.StartWorkflowExecutionResponse{ - RunId: t.RunId, - }, nil + if err != nil { + switch t := err.(type) { + case *persistence.WorkflowExecutionAlreadyStartedError: + if t.StartRequestID == common.StringDefault(request.RequestId) { + deleteEvents() + return t.RunID, nil + } + case *persistence.ShardOwnershipLostError: + deleteEvents() } - case *persistence.ShardOwnershipLostError: - // We created the history events but failed to create workflow execution, so cleanup the history which could cause - // us to leak history events which are never cleaned up. Cleaning up the events is absolutely safe here as they - // are always created for a unique run_id which is not visible beyond this call yet. - // TODO: Handle error on deletion of execution history - e.historyMgr.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{ - DomainID: domainID, - Execution: execution, - }) + return "", err } + return execution.GetRunId(), nil + } - return nil, err + workflowExistsErrHandler := func(err *persistence.WorkflowExecutionAlreadyStartedError) error { + // set the prev run ID for database conditional update + prevStartRequestID := err.StartRequestID + prevRunID := err.RunID + prevState := err.State + prevCloseState := err.CloseStatus + + errFn := func(errMsg string, createRequestID string, workflowID string, runID string) error { + msg := fmt.Sprintf(errMsg, workflowID, runID) + return &workflow.WorkflowExecutionAlreadyStartedError{ + Message: common.StringPtr(msg), + StartRequestId: common.StringPtr(fmt.Sprintf("%v", createRequestID)), + RunId: common.StringPtr(fmt.Sprintf("%v", runID)), + } + } + + // here we know there is some information about the prev workflow, i.e. either running right now + // or has history check if the this workflow is finished + if prevState != persistence.WorkflowStateCompleted { + deleteEvents() + msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + } + switch startRequest.StartRequest.GetWorkflowIdReusePolicy() { + case workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly: + if _, ok := FailedWorkflowCloseState[prevCloseState]; !ok { + deleteEvents() + msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + } + case workflow.WorkflowIdReusePolicyAllowDuplicate: + // as long as workflow not running, so this case has no check + case workflow.WorkflowIdReusePolicyRejectDuplicate: + deleteEvents() + msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v." + return errFn(msg, prevStartRequestID, execution.GetWorkflowId(), prevRunID) + default: + deleteEvents() + return &workflow.InternalServiceError{Message: "Failed to process start workflow reuse policy."} + } + + return nil + } + + // try to create the workflow execution + isBrandNew := true + resultRunID := "" + resultRunID, err = createWorkflow(isBrandNew, "") + // if err still non nil, see if retry + if errExist, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { + if err = workflowExistsErrHandler(errExist); err == nil { + isBrandNew = false + resultRunID, err = createWorkflow(isBrandNew, errExist.RunID) + } } - e.timerProcessor.NotifyNewTimer(timerTasks) + if err == nil { + e.timerProcessor.NotifyNewTimer(timerTasks) - return &workflow.StartWorkflowExecutionResponse{ - RunId: execution.RunId, - }, nil + return &workflow.StartWorkflowExecutionResponse{ + RunId: common.StringPtr(resultRunID), + }, nil + } + return nil, err } // GetMutableState retrieves the mutable state of the workflow execution diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index d916e0fac15..0a8765f1af4 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -701,7 +701,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_BrandNew() { taskList := "testTaskList" identity := "testIdentity" - s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(nil, &workflow.EntityNotExistsError{}).Once() s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() @@ -730,30 +729,22 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { identity := "testIdentity" requestID := "requestID" - workflowExecution := workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(runID), - } - - s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() - s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return( - &persistence.GetWorkflowExecutionResponse{ - State: &persistence.WorkflowMutableState{ - ExecutionInfo: &persistence.WorkflowExecutionInfo{ - CreateRequestID: requestID, - State: persistence.WorkflowStateRunning, - CloseStatus: persistence.WorkflowCloseStatusNone, - }, - }, - }, - nil, - ).Once() + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: requestID, + RunID: runID, + State: persistence.WorkflowStateRunning, + CloseStatus: persistence.WorkflowCloseStatusNone, + }).Once() + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ DomainUUID: common.StringPtr(domainID), StartRequest: &workflow.StartWorkflowExecutionRequest{ Domain: common.StringPtr(domainID), - WorkflowId: common.StringPtr(*workflowExecution.WorkflowId), + WorkflowId: common.StringPtr(workflowID), WorkflowType: &workflow.WorkflowType{Name: common.StringPtr(workflowType)}, TaskList: &workflow.TaskList{Name: common.StringPtr(taskList)}, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(1), @@ -763,7 +754,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() { }, }) s.Nil(err) - s.NotNil(resp.RunId) + s.Equal(runID, resp.GetRunId()) } func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { @@ -774,19 +765,16 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() { taskList := "testTaskList" identity := "testIdentity" - s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Once() - s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return( - &persistence.GetWorkflowExecutionResponse{ - State: &persistence.WorkflowMutableState{ - ExecutionInfo: &persistence.WorkflowExecutionInfo{ - CreateRequestID: "oldRequestID", - State: persistence.WorkflowStateRunning, - CloseStatus: persistence.WorkflowCloseStatusNone, - }, - }, - }, - nil, - ).Once() + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runID, + State: persistence.WorkflowStateRunning, + CloseStatus: persistence.WorkflowCloseStatusNone, + }).Once() + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ DomainUUID: common.StringPtr(domainID), @@ -821,25 +809,27 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() { expecedErrs := []bool{true, false, true} - s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{RunID: runID}, nil).Times(len(expecedErrs)) - // since cache is enabled in the test suit, this will only be executed once - s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return( - &persistence.GetWorkflowExecutionResponse{ - State: &persistence.WorkflowMutableState{ - ExecutionInfo: &persistence.WorkflowExecutionInfo{ - CreateRequestID: "oldRequestID", - State: persistence.WorkflowStateCompleted, - CloseStatus: persistence.WorkflowCloseStatusCompleted, - }, - }, - }, - nil, - ).Once() - for index, option := range options { + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), + ).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runID, + State: persistence.WorkflowStateCompleted, + CloseStatus: persistence.WorkflowCloseStatusCompleted, + }).Times(len(expecedErrs)) + for index, option := range options { if !expecedErrs[index] { - s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() - s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == true }), + ).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + } else { + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ @@ -874,10 +864,6 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { taskList := "testTaskList" identity := "testIdentity" - execution := workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - } - options := []workflow.WorkflowIdReusePolicy{ workflow.WorkflowIdReusePolicyAllowDuplicateFailedOnly, workflow.WorkflowIdReusePolicyAllowDuplicate, @@ -895,29 +881,29 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() { runIDs := []string{"1", "2", "3", "4"} for i, closeState := range closeStates { - s.mockExecutionMgr.On("GetCurrentExecution", mock.Anything).Return(&persistence.GetCurrentExecutionResponse{RunID: runIDs[i]}, nil).Times(len(expecedErrs)) - // since cache is enabled in the test suit, this will only be executed once - execution.RunId = common.StringPtr(runIDs[i]) - s.mockExecutionMgr.On("GetWorkflowExecution", &persistence.GetWorkflowExecutionRequest{ - DomainID: domainID, - Execution: execution, - }).Return( - &persistence.GetWorkflowExecutionResponse{ - State: &persistence.WorkflowMutableState{ - ExecutionInfo: &persistence.WorkflowExecutionInfo{ - CreateRequestID: "oldRequestID", - State: persistence.WorkflowStateCompleted, - CloseStatus: closeState, - }, - }, - }, - nil, - ).Once() + + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockShardManager.On("UpdateShard", mock.Anything).Return(nil).Times(len(expecedErrs)) + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == false }), + ).Return(nil, &persistence.WorkflowExecutionAlreadyStartedError{ + Msg: "random message", + StartRequestID: "oldRequestID", + RunID: runIDs[i], + State: persistence.WorkflowStateCompleted, + CloseStatus: closeState, + }).Times(len(expecedErrs)) + for j, option := range options { if !expecedErrs[j] { - s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() - s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + s.mockExecutionMgr.On( + "CreateWorkflowExecution", + mock.MatchedBy(func(request *persistence.CreateWorkflowExecutionRequest) bool { return request.ContinueAsNew == true }), + ).Return(&persistence.CreateWorkflowExecutionResponse{TaskID: uuid.New()}, nil).Once() + } else { + s.mockHistoryMgr.On("DeleteWorkflowExecutionHistory", mock.Anything).Return(nil).Once() } resp, err := s.historyEngine.StartWorkflowExecution(&h.StartWorkflowExecutionRequest{ diff --git a/service/history/mutableStateBuilder.go b/service/history/mutableStateBuilder.go index b1d9b472dba..6e514a99fe9 100644 --- a/service/history/mutableStateBuilder.go +++ b/service/history/mutableStateBuilder.go @@ -1398,7 +1398,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int "{OutStandingActivityTasks: %v, HasPendingDecision: %v}", len(e.pendingActivityInfoIDs), e.HasPendingDecisionTask())) } - + prevRunID := e.executionInfo.RunID e.executionInfo.State = persistence.WorkflowStateCompleted e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusContinuedAsNew newExecution := workflow.WorkflowExecution{ @@ -1453,6 +1453,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int DecisionStartedID: di.StartedID, DecisionStartToCloseTimeout: di.DecisionTimeout, ContinueAsNew: true, + PreviousRunID: prevRunID, } return e.hBuilder.AddContinuedAsNewEvent(decisionCompletedEventID, newRunID, attributes), newStateBuilder, nil