Skip to content

Commit

Permalink
use cassandra if statement for start workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Wenquan Xing committed Dec 22, 2017
1 parent 945b2d4 commit f3c7fd6
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 228 deletions.
66 changes: 43 additions & 23 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,19 @@ const (
`IF range_id = ?`

templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` +
`SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?}` +
`SET current_run_id = ?, execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? `
`and task_id = ? ` +
`IF current_run_id = ? `

templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?}) IF NOT EXISTS USING TTL 0 `
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 `

templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` +
Expand Down Expand Up @@ -286,7 +287,7 @@ const (
`and visibility_ts = ? ` +
`and task_id = ?`

templateGetCurrentExecutionQuery = `SELECT current_run_id ` +
templateGetCurrentExecutionQuery = `SELECT current_run_id, execution ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand Down Expand Up @@ -792,12 +793,15 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx

if execution, ok := previous["execution"].(map[string]interface{}); ok {
// CreateWorkflowExecution failed because it already exists
executionInfo := createWorkflowExecutionInfo(execution)
msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)",
execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ","))
return nil, &workflow.WorkflowExecutionAlreadyStartedError{
Message: common.StringPtr(msg),
StartRequestId: common.StringPtr(fmt.Sprintf("%v", execution["create_request_id"])),
RunId: common.StringPtr(fmt.Sprintf("%v", execution["run_id"])),
request.Execution.GetWorkflowId(), executionInfo.RunID, request.RangeID, strings.Join(columns, ","))
return nil, &WorkflowExecutionAlreadyStartedError{
Msg: msg,
StartRequestID: executionInfo.CreateRequestID,
RunID: executionInfo.RunID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
}
}
}
Expand Down Expand Up @@ -827,18 +831,36 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx

func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *CreateWorkflowExecutionRequest,
batch *gocql.Batch, cqlNowTimestamp int64) {

parentDomainID := emptyDomainID
parentWorkflowID := ""
parentRunID := emptyRunID
initiatedID := emptyInitiatedID
state := WorkflowStateRunning
closeStatus := WorkflowCloseStatusNone
if request.ParentExecution != nil {
parentDomainID = request.ParentDomainID
parentWorkflowID = *request.ParentExecution.WorkflowId
parentRunID = *request.ParentExecution.RunId
initiatedID = request.InitiatedID
state = WorkflowStateCreated
}

if request.ContinueAsNew {
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
*request.Execution.RunId,
*request.Execution.RunId,
request.RequestID,
state,
closeStatus,
d.shardID,
rowTypeExecution,
request.DomainID,
*request.Execution.WorkflowId,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.PreviousRunID,
)
} else {
batch.Query(templateCreateWorkflowExecutionQuery,
Expand All @@ -852,20 +874,11 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat
*request.Execution.RunId,
*request.Execution.RunId,
request.RequestID,
state,
closeStatus,
)
}

parentDomainID := emptyDomainID
parentWorkflowID := ""
parentRunID := emptyRunID
initiatedID := emptyInitiatedID
if request.ParentExecution != nil {
parentDomainID = request.ParentDomainID
parentWorkflowID = *request.ParentExecution.WorkflowId
parentRunID = *request.ParentExecution.RunId
initiatedID = request.InitiatedID
}

batch.Query(templateCreateWorkflowExecutionQuery2,
d.shardID,
request.DomainID,
Expand Down Expand Up @@ -1207,8 +1220,8 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR
defaultVisibilityTimestamp,
rowTypeExecutionTaskID)

var currentRunID string
if err := query.Scan(&currentRunID); err != nil {
result := make(map[string]interface{})
if err := query.MapScan(result); err != nil {
if err == gocql.ErrNotFound {
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v",
Expand All @@ -1225,7 +1238,14 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR
}
}

return &GetCurrentExecutionResponse{RunID: currentRunID}, nil
currentRunID := result["current_run_id"].(gocql.UUID).String()
executionInfo := createWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
return &GetCurrentExecutionResponse{
RunID: currentRunID,
StartRequestID: executionInfo.CreateRequestID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
}, nil
}

func (d *cassandraPersistence) GetTransferTasks(request *GetTransferTasksRequest) (*GetTransferTasksResponse, error) {
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() {
task1, err1 := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType1", 20, 14, nil, 3, 0, 2, nil)
s.NotNil(err1, "Expected workflow creation to fail.")
log.Infof("Unable to start workflow execution: %v", err1)
startedErr, ok := err1.(*gen.WorkflowExecutionAlreadyStartedError)
startedErr, ok := err1.(*WorkflowExecutionAlreadyStartedError)
s.True(ok)
s.Equal(workflowExecution.RunId, startedErr.RunId, startedErr.Message)
s.Equal(workflowExecution.GetRunId(), startedErr.RunID, startedErr.Msg)
s.Empty(task1, "Expected empty task identifier.")

response, err2 := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{
Expand Down
21 changes: 19 additions & 2 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ type (
Msg string
}

// WorkflowExecutionAlreadyStartedError is returned when creating a new workflow failed.
WorkflowExecutionAlreadyStartedError struct {
Msg string
StartRequestID string
RunID string
State int
CloseStatus int
}

// TimeoutError is returned when a write operation fails due to a timeout
TimeoutError struct {
Msg string
Expand Down Expand Up @@ -371,6 +380,8 @@ type (
DecisionStartedID int64
DecisionStartToCloseTimeout int32
ContinueAsNew bool
PreviousRunID string
ExecutionInfo *WorkflowExecutionInfo
}

// CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest
Expand All @@ -397,8 +408,10 @@ type (

// GetCurrentExecutionResponse is the response to GetCurrentExecution
GetCurrentExecutionResponse struct {
RunID string
ExecutionInfo *WorkflowExecutionInfo
StartRequestID string
RunID string
State int
CloseStatus int
}

// UpdateWorkflowExecutionRequest is used to update a workflow execution
Expand Down Expand Up @@ -718,6 +731,10 @@ func (e *ShardOwnershipLostError) Error() string {
return e.Msg
}

func (e *WorkflowExecutionAlreadyStartedError) Error() string {
return e.Msg
}

func (e *TimeoutError) Error() string {
return e.Msg
}
Expand Down
25 changes: 7 additions & 18 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (s *TestBase) UpdateShard(updatedInfo *ShardInfo, previousRangeID int64) er
// CreateWorkflowExecution is a utility method to create workflow executions
func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution, taskList,
wType string, wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
decisionScheduleID int64, timerTasks []Task) (string, error) {
decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) {
response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
DomainID: domainID,
Expand All @@ -230,17 +230,13 @@ func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution wo
DecisionStartToCloseTimeout: 1,
})

if err != nil {
return "", err
}

return response.TaskID, nil
return response, err
}

// CreateWorkflowExecutionManyTasks is a utility method to create workflow executions
func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution,
taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
decisionScheduleIDs []int64, activityScheduleIDs []int64) (string, error) {
decisionScheduleIDs []int64, activityScheduleIDs []int64) (*CreateWorkflowExecutionResponse, error) {

transferTasks := []Task{}
for _, decisionScheduleID := range decisionScheduleIDs {
Expand Down Expand Up @@ -278,18 +274,14 @@ func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExe
DecisionStartToCloseTimeout: 1,
})

if err != nil {
return "", err
}

return response.TaskID, nil
return response, err
}

// CreateChildWorkflowExecution is a utility method to create child workflow executions
func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecution workflow.WorkflowExecution,
parentDomainID string, parentExecution *workflow.WorkflowExecution, initiatedID int64, taskList, wType string,
wTimeout int32, decisionTimeout int32, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
decisionScheduleID int64, timerTasks []Task) (string, error) {
decisionScheduleID int64, timerTasks []Task) (*CreateWorkflowExecutionResponse, error) {
response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
DomainID: domainID,
Expand Down Expand Up @@ -319,11 +311,7 @@ func (s *TestBase) CreateChildWorkflowExecution(domainID string, workflowExecuti
DecisionStartToCloseTimeout: 1,
})

if err != nil {
return "", err
}

return response.TaskID, nil
return response, err
}

// GetWorkflowExecutionInfo is a utility method to retrieve execution info
Expand Down Expand Up @@ -393,6 +381,7 @@ func (s *TestBase) ContinueAsNewExecution(updatedInfo *WorkflowExecutionInfo, co
DecisionStartedID: common.EmptyEventID,
DecisionStartToCloseTimeout: 1,
ContinueAsNew: true,
PreviousRunID: updatedInfo.RunID,
},
})
}
Expand Down
Loading

0 comments on commit f3c7fd6

Please sign in to comment.