Skip to content

Commit

Permalink
Merge branch 'master' into cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Feb 16, 2018
2 parents d0cd134 + 24b89d9 commit 4a2609a
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 75 deletions.
5 changes: 3 additions & 2 deletions .gen/go/shared/idl.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions .gen/go/shared/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions common/cassandra_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@ package common

import (
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/uber/cadence/common/logging"

"io/ioutil"
"os"
"github.com/uber/cadence/tools/cassandra"

"github.com/gocql/gocql"
log "github.com/sirupsen/logrus"
"github.com/uber/cadence/tools/cassandra"
)

// NewCassandraCluster creates a cassandra cluster given comma separated list of clusterHosts
Expand Down Expand Up @@ -89,7 +88,9 @@ func DropCassandraKeyspace(s *gocql.Session, keyspace string) (err error) {
}

// LoadCassandraSchema loads the schema from the given .cql files on this keyspace
func LoadCassandraSchema(dir string, fileNames []string, keyspace string, override bool) (err error) {
func LoadCassandraSchema(
dir string, fileNames []string, port int, keyspace string, override bool,
) (err error) {

tmpFile, err := ioutil.TempFile("", "_cadence_")
if err != nil {
Expand All @@ -111,6 +112,7 @@ func LoadCassandraSchema(dir string, fileNames []string, keyspace string, overri
config := &cassandra.SetupSchemaConfig{
BaseConfig: cassandra.BaseConfig{
CassHosts: "127.0.0.1",
CassPort: port,
CassKeyspace: keyspace,
},
SchemaFilePath: tmpFile.Name(),
Expand Down
35 changes: 20 additions & 15 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (
"sync/atomic"
"time"

"github.com/gocql/gocql"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"github.com/uber-common/bark"

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/logging"

"github.com/gocql/gocql"
"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"github.com/uber-common/bark"
)

const (
Expand Down Expand Up @@ -93,6 +93,7 @@ type (

// CassandraTestCluster allows executing cassandra operations in testing.
CassandraTestCluster struct {
port int
keyspace string
cluster *gocql.ClusterConfig
session *gocql.Session
Expand Down Expand Up @@ -123,7 +124,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
testAllClusterNames,
)
// Setup Workflow keyspace and deploy schema for tests
s.CassandraTestCluster.setupTestCluster(options.KeySpace, options.DropKeySpace, options.SchemaDir)
s.CassandraTestCluster.setupTestCluster(options)
shardID := 0
var err error
s.ShardMgr, err = NewCassandraShardPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser,
Expand Down Expand Up @@ -843,23 +844,27 @@ func (s *TestBase) ClearTransferQueue() {
atomic.StoreInt64(&s.readLevel, 0)
}

func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bool, schemaDir string) {
func (s *CassandraTestCluster) setupTestCluster(options TestBaseOptions) {
keySpace := options.KeySpace
if keySpace == "" {
keySpace = generateRandomKeyspace(10)
}
s.createCluster(testWorkflowClusterHosts, testPort, testUser, testPassword, testDatacenter, gocql.Consistency(1),
keySpace)
s.createKeyspace(1, dropKeySpace)
s.loadSchema([]string{"schema.cql"}, schemaDir)
s.loadVisibilitySchema([]string{"schema.cql"}, schemaDir)
s.createCluster(
testWorkflowClusterHosts, options.ClusterPort, testUser, testPassword, testDatacenter,
gocql.Consistency(1), keySpace,
)
s.createKeyspace(1, options.DropKeySpace)
s.loadSchema([]string{"schema.cql"}, options.SchemaDir)
s.loadVisibilitySchema([]string{"schema.cql"}, options.SchemaDir)
}

func (s *CassandraTestCluster) tearDownTestCluster() {
s.dropKeyspace()
s.session.Close()
}

func (s *CassandraTestCluster) createCluster(clusterHosts string, port int, user, password, dc string,
func (s *CassandraTestCluster) createCluster(
clusterHosts string, port int, user, password, dc string,
cons gocql.Consistency, keyspace string) {
s.cluster = common.NewCassandraCluster(clusterHosts, port, user, password, dc)
s.cluster.Consistency = cons
Expand Down Expand Up @@ -895,7 +900,7 @@ func (s *CassandraTestCluster) loadSchema(fileNames []string, schemaDir string)
workflowSchemaDir = schemaDir + "/schema/cadence"
}

err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, true)
err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Port, s.keyspace, true)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
Expand All @@ -907,7 +912,7 @@ func (s *CassandraTestCluster) loadVisibilitySchema(fileNames []string, schemaDi
workflowSchemaDir = schemaDir + "/schema/visibility"
}

err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.keyspace, false)
err := common.LoadCassandraSchema(workflowSchemaDir, fileNames, s.cluster.Port, s.keyspace, false)
if err != nil && !strings.Contains(err.Error(), "AlreadyExists") {
log.Fatal(err)
}
Expand Down
15 changes: 3 additions & 12 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3748,7 +3748,6 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeStartChildWorkflowExecution),
StartChildWorkflowExecutionDecisionAttributes: &workflow.StartChildWorkflowExecutionDecisionAttributes{
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(childID),
WorkflowType: childWorkflowType,
TaskList: taskListChild,
Expand Down Expand Up @@ -3841,7 +3840,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
s.Nil(err)
s.NotNil(completedEvent)
completedAttributes := completedEvent.ChildWorkflowExecutionCompletedEventAttributes
s.Equal(s.domainName, *completedAttributes.Domain)
s.Nil(completedAttributes.Domain)
s.Equal(childID, *completedAttributes.WorkflowExecution.WorkflowId)
s.Equal(wtChild, *completedAttributes.WorkflowType.Name)
s.Equal([]byte("Child Done."), completedAttributes.Result)
Expand Down Expand Up @@ -3913,11 +3912,7 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {
return []byte(strconv.Itoa(int(continueAsNewCounter))), []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeContinueAsNewWorkflowExecution),
ContinueAsNewWorkflowExecutionDecisionAttributes: &workflow.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: childWorkflowType,
TaskList: taskList,
Input: buf.Bytes(),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(10),
Input: buf.Bytes(),
},
}}, nil
}
Expand Down Expand Up @@ -3945,12 +3940,8 @@ func (s *integrationSuite) TestChildWorkflowWithContinueAsNew() {
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(childID),
WorkflowType: childWorkflowType,
TaskList: taskList,
Input: buf.Bytes(),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyTerminate),
Control: nil,
ChildPolicy: common.ChildPolicyPtr(workflow.ChildPolicyTerminate),
},
}}, nil
} else if previousStartedEventID > 0 {
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ enum DecisionTaskFailedCause {
RESET_STICKY_TASKLIST,
WORKFLOW_WORKER_UNHANDLED_FAILURE,
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES,
BAD_START_CHILD_EXECUTION_ATTRIBUTES,
}

enum CancelExternalWorkflowExecutionFailedCause {
Expand Down
46 changes: 23 additions & 23 deletions service/history/historyBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (b *historyBuilder) AddStartChildWorkflowExecutionInitiatedEvent(decisionCo
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionStartedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionStartedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID int64) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID)

Expand All @@ -369,7 +369,7 @@ func (b *historyBuilder) AddStartChildWorkflowExecutionFailedEvent(initiatedID i
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionCompletedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionCompletedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
completedAttributes *workflow.WorkflowExecutionCompletedEventAttributes) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionCompletedEvent(domain, execution, workflowType, initiatedID, startedID,
Expand All @@ -378,7 +378,7 @@ func (b *historyBuilder) AddChildWorkflowExecutionCompletedEvent(domain string,
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionFailedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionFailedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
failedAttributes *workflow.WorkflowExecutionFailedEventAttributes) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionFailedEvent(domain, execution, workflowType, initiatedID, startedID,
Expand All @@ -387,7 +387,7 @@ func (b *historyBuilder) AddChildWorkflowExecutionFailedEvent(domain string, exe
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionCanceledEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionCanceledEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
canceledAttributes *workflow.WorkflowExecutionCanceledEventAttributes) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionCanceledEvent(domain, execution, workflowType, initiatedID, startedID,
Expand All @@ -396,7 +396,7 @@ func (b *historyBuilder) AddChildWorkflowExecutionCanceledEvent(domain string, e
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionTerminatedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionTerminatedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
terminatedAttributes *workflow.WorkflowExecutionTerminatedEventAttributes) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionTerminatedEvent(domain, execution, workflowType, initiatedID, startedID,
Expand All @@ -405,7 +405,7 @@ func (b *historyBuilder) AddChildWorkflowExecutionTerminatedEvent(domain string,
return b.addEventToHistory(event)
}

func (b *historyBuilder) AddChildWorkflowExecutionTimedOutEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) AddChildWorkflowExecutionTimedOutEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
timedOutAttributes *workflow.WorkflowExecutionTimedOutEventAttributes) *workflow.HistoryEvent {
event := b.newChildWorkflowExecutionTimedOutEvent(domain, execution, workflowType, initiatedID, startedID,
Expand Down Expand Up @@ -774,14 +774,14 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa
startAttributes *workflow.StartChildWorkflowExecutionDecisionAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeStartChildWorkflowExecutionInitiated)
attributes := &workflow.StartChildWorkflowExecutionInitiatedEventAttributes{}
attributes.Domain = common.StringPtr(*startAttributes.Domain)
attributes.WorkflowId = common.StringPtr(*startAttributes.WorkflowId)
attributes.Domain = startAttributes.Domain
attributes.WorkflowId = startAttributes.WorkflowId
attributes.WorkflowType = startAttributes.WorkflowType
attributes.TaskList = startAttributes.TaskList
attributes.Input = startAttributes.Input
attributes.ExecutionStartToCloseTimeoutSeconds = common.Int32Ptr(*startAttributes.ExecutionStartToCloseTimeoutSeconds)
attributes.TaskStartToCloseTimeoutSeconds = common.Int32Ptr(*startAttributes.TaskStartToCloseTimeoutSeconds)
attributes.ChildPolicy = common.ChildPolicyPtr(*startAttributes.ChildPolicy)
attributes.ExecutionStartToCloseTimeoutSeconds = startAttributes.ExecutionStartToCloseTimeoutSeconds
attributes.TaskStartToCloseTimeoutSeconds = startAttributes.TaskStartToCloseTimeoutSeconds
attributes.ChildPolicy = startAttributes.ChildPolicy
attributes.Control = startAttributes.Control
attributes.DecisionTaskCompletedEventId = common.Int64Ptr(decisionTaskCompletedEventID)
attributes.WorkflowIdReusePolicy = startAttributes.WorkflowIdReusePolicy
Expand All @@ -790,11 +790,11 @@ func (b *historyBuilder) newStartChildWorkflowExecutionInitiatedEvent(decisionTa
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionStartedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionStartedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID int64) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionStarted)
attributes := &workflow.ChildWorkflowExecutionStartedEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
Expand All @@ -820,12 +820,12 @@ func (b *historyBuilder) newStartChildWorkflowExecutionFailedEvent(initiatedID i
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionCompletedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionCompletedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
completedAttributes *workflow.WorkflowExecutionCompletedEventAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionCompleted)
attributes := &workflow.ChildWorkflowExecutionCompletedEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
Expand All @@ -836,12 +836,12 @@ func (b *historyBuilder) newChildWorkflowExecutionCompletedEvent(domain string,
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionFailedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionFailedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
failedAttributes *workflow.WorkflowExecutionFailedEventAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionFailed)
attributes := &workflow.ChildWorkflowExecutionFailedEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
Expand All @@ -853,12 +853,12 @@ func (b *historyBuilder) newChildWorkflowExecutionFailedEvent(domain string, exe
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionCanceledEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionCanceledEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
canceledAttributes *workflow.WorkflowExecutionCanceledEventAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionCanceled)
attributes := &workflow.ChildWorkflowExecutionCanceledEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
Expand All @@ -869,12 +869,12 @@ func (b *historyBuilder) newChildWorkflowExecutionCanceledEvent(domain string, e
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionTerminatedEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionTerminatedEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
terminatedAttributes *workflow.WorkflowExecutionTerminatedEventAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionTerminated)
attributes := &workflow.ChildWorkflowExecutionTerminatedEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
attributes.InitiatedEventId = common.Int64Ptr(initiatedID)
Expand All @@ -884,12 +884,12 @@ func (b *historyBuilder) newChildWorkflowExecutionTerminatedEvent(domain string,
return historyEvent
}

func (b *historyBuilder) newChildWorkflowExecutionTimedOutEvent(domain string, execution *workflow.WorkflowExecution,
func (b *historyBuilder) newChildWorkflowExecutionTimedOutEvent(domain *string, execution *workflow.WorkflowExecution,
workflowType *workflow.WorkflowType, initiatedID, startedID int64,
timedOutAttributes *workflow.WorkflowExecutionTimedOutEventAttributes) *workflow.HistoryEvent {
historyEvent := b.msBuilder.createNewHistoryEvent(workflow.EventTypeChildWorkflowExecutionTimedOut)
attributes := &workflow.ChildWorkflowExecutionTimedOutEventAttributes{}
attributes.Domain = common.StringPtr(domain)
attributes.Domain = domain
attributes.TimeoutType = timedOutAttributes.TimeoutType
attributes.WorkflowExecution = execution
attributes.WorkflowType = workflowType
Expand Down
Loading

0 comments on commit 4a2609a

Please sign in to comment.