From edb556c5f073cdfbbb6eff55ebd53e4aeb983b33 Mon Sep 17 00:00:00 2001 From: Samar Abbas - Uber Date: Fri, 16 Mar 2018 08:32:24 -0700 Subject: [PATCH] Persistence support for replication state for execution (#604) Added support for workflow mutable state to create/read/update replication state for an execution. Suport for transfer task creation for Replication Task. * Added unit test for replication state for execution * Address code review feedback * fix schema validation unit test --- .gen/go/replicator/idl.go | 4 +- .gen/go/replicator/types.go | 224 ++++++++++++- common/persistence/cassandraPersistence.go | 152 ++++++++- .../persistence/cassandraPersistence_test.go | 302 ++++++++++++++++++ common/persistence/dataInterfaces.go | 48 +++ common/persistence/persistenceTestBase.go | 57 +++- idl/github.com/uber/cadence/replicator.thrift | 6 + schema/cadence/schema.cql | 24 +- .../v0.6/history_replication_task.cql | 22 ++ schema/cadence/versioned/v0.6/manifest.json | 8 + tools/cassandra/updateTask_test.go | 2 +- 11 files changed, 834 insertions(+), 15 deletions(-) create mode 100644 schema/cadence/versioned/v0.6/history_replication_task.cql create mode 100644 schema/cadence/versioned/v0.6/manifest.json diff --git a/.gen/go/replicator/idl.go b/.gen/go/replicator/idl.go index cdb2eeefc93..d1473345f31 100644 --- a/.gen/go/replicator/idl.go +++ b/.gen/go/replicator/idl.go @@ -33,11 +33,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "replicator", Package: "github.com/uber/cadence/.gen/go/replicator", FilePath: "replicator.thrift", - SHA1: "37e450246bd52acca7eb4c925c127ea6d1c7bf8e", + SHA1: "053012257c4c99db426a24ba3403b0d939056309", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n}\n\nstruct HistoryTaskAttributes {\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 20: optional DomainTaskAttributes domainTaskAttributes\n 30: optional HistoryTaskAttributes historyTaskAttributes\n}\n\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n}\n\nstruct HistoryTaskAttributes {\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") firstEventId\n 50: optional i64 (js.type = \"Long\") nextEventId\n 60: optional i64 (js.type = \"Long\") version\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 20: optional DomainTaskAttributes domainTaskAttributes\n 30: optional HistoryTaskAttributes historyTaskAttributes\n}\n\n" diff --git a/.gen/go/replicator/types.go b/.gen/go/replicator/types.go index 55fbbbddef0..e90d6bbec5f 100644 --- a/.gen/go/replicator/types.go +++ b/.gen/go/replicator/types.go @@ -520,6 +520,12 @@ func (v *DomainTaskAttributes) GetFailoverVersion() (o int64) { } type HistoryTaskAttributes struct { + DomainId *string `json:"domainId,omitempty"` + WorkflowId *string `json:"workflowId,omitempty"` + RunId *string `json:"runId,omitempty"` + FirstEventId *int64 `json:"firstEventId,omitempty"` + NextEventId *int64 `json:"nextEventId,omitempty"` + Version *int64 `json:"version,omitempty"` } // ToWire translates a HistoryTaskAttributes struct into a Thrift-level intermediate @@ -539,10 +545,61 @@ type HistoryTaskAttributes struct { // } func (v *HistoryTaskAttributes) ToWire() (wire.Value, error) { var ( - fields [0]wire.Field + fields [6]wire.Field i int = 0 + w wire.Value + err error ) + if v.DomainId != nil { + w, err = wire.NewValueString(*(v.DomainId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 10, Value: w} + i++ + } + if v.WorkflowId != nil { + w, err = wire.NewValueString(*(v.WorkflowId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 20, Value: w} + i++ + } + if v.RunId != nil { + w, err = wire.NewValueString(*(v.RunId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 30, Value: w} + i++ + } + if v.FirstEventId != nil { + w, err = wire.NewValueI64(*(v.FirstEventId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } + if v.NextEventId != nil { + w, err = wire.NewValueI64(*(v.NextEventId)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 50, Value: w} + i++ + } + if v.Version != nil { + w, err = wire.NewValueI64(*(v.Version)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 60, Value: w} + i++ + } + return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -564,9 +621,70 @@ func (v *HistoryTaskAttributes) ToWire() (wire.Value, error) { // } // return &v, nil func (v *HistoryTaskAttributes) FromWire(w wire.Value) error { + var err error for _, field := range w.GetStruct().Fields { switch field.ID { + case 10: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.DomainId = &x + if err != nil { + return err + } + + } + case 20: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.WorkflowId = &x + if err != nil { + return err + } + + } + case 30: + if field.Value.Type() == wire.TBinary { + var x string + x, err = field.Value.GetString(), error(nil) + v.RunId = &x + if err != nil { + return err + } + + } + case 40: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.FirstEventId = &x + if err != nil { + return err + } + + } + case 50: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.NextEventId = &x + if err != nil { + return err + } + + } + case 60: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.Version = &x + if err != nil { + return err + } + + } } } @@ -580,8 +698,32 @@ func (v *HistoryTaskAttributes) String() string { return "" } - var fields [0]string + var fields [6]string i := 0 + if v.DomainId != nil { + fields[i] = fmt.Sprintf("DomainId: %v", *(v.DomainId)) + i++ + } + if v.WorkflowId != nil { + fields[i] = fmt.Sprintf("WorkflowId: %v", *(v.WorkflowId)) + i++ + } + if v.RunId != nil { + fields[i] = fmt.Sprintf("RunId: %v", *(v.RunId)) + i++ + } + if v.FirstEventId != nil { + fields[i] = fmt.Sprintf("FirstEventId: %v", *(v.FirstEventId)) + i++ + } + if v.NextEventId != nil { + fields[i] = fmt.Sprintf("NextEventId: %v", *(v.NextEventId)) + i++ + } + if v.Version != nil { + fields[i] = fmt.Sprintf("Version: %v", *(v.Version)) + i++ + } return fmt.Sprintf("HistoryTaskAttributes{%v}", strings.Join(fields[:i], ", ")) } @@ -591,10 +733,88 @@ func (v *HistoryTaskAttributes) String() string { // // This function performs a deep comparison. func (v *HistoryTaskAttributes) Equals(rhs *HistoryTaskAttributes) bool { + if !_String_EqualsPtr(v.DomainId, rhs.DomainId) { + return false + } + if !_String_EqualsPtr(v.WorkflowId, rhs.WorkflowId) { + return false + } + if !_String_EqualsPtr(v.RunId, rhs.RunId) { + return false + } + if !_I64_EqualsPtr(v.FirstEventId, rhs.FirstEventId) { + return false + } + if !_I64_EqualsPtr(v.NextEventId, rhs.NextEventId) { + return false + } + if !_I64_EqualsPtr(v.Version, rhs.Version) { + return false + } return true } +// GetDomainId returns the value of DomainId if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetDomainId() (o string) { + if v.DomainId != nil { + return *v.DomainId + } + + return +} + +// GetWorkflowId returns the value of WorkflowId if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetWorkflowId() (o string) { + if v.WorkflowId != nil { + return *v.WorkflowId + } + + return +} + +// GetRunId returns the value of RunId if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetRunId() (o string) { + if v.RunId != nil { + return *v.RunId + } + + return +} + +// GetFirstEventId returns the value of FirstEventId if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetFirstEventId() (o int64) { + if v.FirstEventId != nil { + return *v.FirstEventId + } + + return +} + +// GetNextEventId returns the value of NextEventId if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetNextEventId() (o int64) { + if v.NextEventId != nil { + return *v.NextEventId + } + + return +} + +// GetVersion returns the value of Version if it is set or its +// zero value if it is unset. +func (v *HistoryTaskAttributes) GetVersion() (o int64) { + if v.Version != nil { + return *v.Version + } + + return +} + type ReplicationTask struct { TaskType *ReplicationTaskType `json:"taskType,omitempty"` DomainTaskAttributes *DomainTaskAttributes `json:"domainTaskAttributes,omitempty"` diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index d0533dffa68..e4ce24a6207 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -141,6 +141,14 @@ const ( `client_impl: ?` + `}` + templateReplicationStateType = `{` + + `current_version: ?, ` + + `start_version: ?, ` + + `last_write_version: ?, ` + + `last_write_event_id: ?, ` + + `last_replication_info: ?` + + `}` + templateTransferTaskType = `{` + `domain_id: ?, ` + `workflow_id: ?, ` + @@ -152,7 +160,11 @@ const ( `target_child_workflow_only: ?, ` + `task_list: ?, ` + `type: ?, ` + - `schedule_id: ?` + + `schedule_id: ?,` + + `first_event_id: ?,` + + `next_event_id: ?,` + + `version: ?,` + + `last_replication_info: ?` + `}` templateTimerTaskType = `{` + @@ -215,6 +227,11 @@ const ( `control: ?` + `}` + templateReplicationInfoType = `{` + + `version: ?, ` + + `last_event_id: ?` + + `}` + templateTaskListType = `{` + `domain_id: ?, ` + `name: ?, ` + @@ -271,8 +288,8 @@ const ( `VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}) IF NOT EXISTS USING TTL 0 ` templateCreateWorkflowExecutionQuery2 = `INSERT INTO executions (` + - `shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id) ` + - `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?) ` + `shard_id, domain_id, workflow_id, run_id, type, execution, replication_state, next_event_id, visibility_ts, task_id) ` + + `VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ` + templateReplicationStateType + `, ?, ?, ?) ` templateCreateTransferTaskQuery = `INSERT INTO executions (` + `shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` + @@ -293,7 +310,7 @@ const ( `and task_id = ? ` + `IF range_id = ?` - templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list ` + + templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list ` + `FROM executions ` + `WHERE shard_id = ? ` + `and type = ? ` + @@ -314,7 +331,7 @@ const ( `and task_id = ?` templateUpdateWorkflowExecutionQuery = `UPDATE executions ` + - `SET execution = ` + templateWorkflowExecutionType + `, next_event_id = ? ` + + `SET execution = ` + templateWorkflowExecutionType + `, replication_state = ` + templateReplicationStateType + `, next_event_id = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + @@ -596,8 +613,18 @@ const ( ) var ( - defaultDateTime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - defaultVisibilityTimestamp = common.UnixNanoToCQLTimestamp(defaultDateTime.UnixNano()) + defaultDateTime = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + defaultVisibilityTimestamp = common.UnixNanoToCQLTimestamp(defaultDateTime.UnixNano()) + emptyVersion int64 = -24 + emptyReplicationInfo = map[string]*ReplicationInfo{} + emptyReplicationInfoMap = map[string]map[string]interface{}{} + emptyReplicationState = &ReplicationState{ + CurrentVersion: emptyVersion, + StartVersion: emptyVersion, + LastWriteVersion: emptyVersion, + LastWriteEventID: common.EmptyEventID, + LastReplicationInfo: emptyReplicationInfo, + } ) type ( @@ -948,6 +975,17 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat ) } + var lastReplicationInfo map[string]map[string]interface{} + if request.ReplicationState == nil { + request.ReplicationState = emptyReplicationState + lastReplicationInfo = emptyReplicationInfoMap + } else { + lastReplicationInfo = make(map[string]map[string]interface{}) + for k, v := range request.ReplicationState.LastReplicationInfo { + lastReplicationInfo[k] = createReplicationInfoMap(v) + } + } + batch.Query(templateCreateWorkflowExecutionQuery2, d.shardID, request.DomainID, @@ -988,6 +1026,11 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat "", // client_library_version "", // client_feature_version "", // client_impl + request.ReplicationState.CurrentVersion, + request.ReplicationState.StartVersion, + request.ReplicationState.LastWriteVersion, + request.ReplicationState.LastWriteEventID, + lastReplicationInfo, request.NextEventID, defaultVisibilityTimestamp, rowTypeExecutionTaskID) @@ -1027,6 +1070,9 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio info := createWorkflowExecutionInfo(result["execution"].(map[string]interface{})) state.ExecutionInfo = info + replicationState := createReplicationState(result["replication_state"].(map[string]interface{})) + state.ReplicationState = replicationState + activityInfos := make(map[int64]*ActivityInfo) aMap := result["activity_map"].(map[int64]map[string]interface{}) for key, value := range aMap { @@ -1087,6 +1133,17 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error { executionInfo := request.ExecutionInfo + replicationState := request.ReplicationState + var lastReplicationInfo map[string]map[string]interface{} + if replicationState == nil { + replicationState = emptyReplicationState + lastReplicationInfo = emptyReplicationInfoMap + } else { + lastReplicationInfo = make(map[string]map[string]interface{}) + for k, v := range replicationState.LastReplicationInfo { + lastReplicationInfo[k] = createReplicationInfoMap(v) + } + } cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) batch := d.session.NewBatch(gocql.LoggedBatch) @@ -1125,6 +1182,11 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx executionInfo.ClientLibraryVersion, executionInfo.ClientFeatureVersion, executionInfo.ClientImpl, + replicationState.CurrentVersion, + replicationState.StartVersion, + replicationState.LastWriteVersion, + replicationState.LastWriteEventID, + lastReplicationInfo, executionInfo.NextEventID, d.shardID, rowTypeExecution, @@ -1801,6 +1863,11 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT targetWorkflowID := transferTaskTransferTargetWorkflowID targetRunID := transferTaskTypeTransferTargetRunID targetChildWorkflowOnly := false + // Replication transfer task specific information + firstEventID := common.EmptyEventID + nextEventID := common.EmptyEventID + version := int64(0) + var lastReplicationInfo map[string]map[string]interface{} switch task.GetType() { case TransferTaskTypeActivityTask: @@ -1837,6 +1904,15 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT targetDomainID = task.(*StartChildExecutionTask).TargetDomainID targetWorkflowID = task.(*StartChildExecutionTask).TargetWorkflowID scheduleID = task.(*StartChildExecutionTask).InitiatedID + + case TransferTaskTypeReplicationTask: + firstEventID = task.(*ReplicationTask).FirstEventID + nextEventID = task.(*ReplicationTask).NextEventID + version = task.(*ReplicationTask).Version + lastReplicationInfo = make(map[string]map[string]interface{}) + for k, v := range task.(*ReplicationTask).LastReplicationInfo { + lastReplicationInfo[k] = createReplicationInfoMap(v) + } } batch.Query(templateCreateTransferTaskQuery, @@ -1856,6 +1932,10 @@ func (d *cassandraPersistence) createTransferTasks(batch *gocql.Batch, transferT taskList, task.GetType(), scheduleID, + firstEventID, + nextEventID, + version, + lastReplicationInfo, defaultVisibilityTimestamp, task.GetTaskID()) } @@ -2267,6 +2347,30 @@ func createWorkflowExecutionInfo(result map[string]interface{}) *WorkflowExecuti return info } +func createReplicationState(result map[string]interface{}) *ReplicationState { + info := &ReplicationState{} + for k, v := range result { + switch k { + case "current_version": + info.CurrentVersion = v.(int64) + case "start_version": + info.StartVersion = v.(int64) + case "last_write_version": + info.LastWriteVersion = v.(int64) + case "last_write_event_id": + info.LastWriteEventID = v.(int64) + case "last_replication_info": + info.LastReplicationInfo = make(map[string]*ReplicationInfo) + replicationInfoMap := v.(map[string]map[string]interface{}) + for key, value := range replicationInfoMap { + info.LastReplicationInfo[key] = createReplicationInfo(value) + } + } + } + + return info +} + func createTransferTaskInfo(result map[string]interface{}) *TransferTaskInfo { info := &TransferTaskInfo{} for k, v := range result { @@ -2296,6 +2400,18 @@ func createTransferTaskInfo(result map[string]interface{}) *TransferTaskInfo { info.TaskType = v.(int) case "schedule_id": info.ScheduleID = v.(int64) + case "first_event_id": + info.FirstEventID = v.(int64) + case "next_event_id": + info.NextEventID = v.(int64) + case "version": + info.Version = v.(int64) + case "last_replication_info": + info.LastReplicationInfo = make(map[string]*ReplicationInfo) + replicationInfoMap := v.(map[string]map[string]interface{}) + for key, value := range replicationInfoMap { + info.LastReplicationInfo[key] = createReplicationInfo(value) + } } } @@ -2478,6 +2594,28 @@ func createTimerTaskInfo(result map[string]interface{}) *TimerTaskInfo { return info } +func createReplicationInfo(result map[string]interface{}) *ReplicationInfo { + info := &ReplicationInfo{} + for k, v := range result { + switch k { + case "version": + info.Version = v.(int64) + case "last_event_id": + info.LastEventID = v.(int64) + } + } + + return info +} + +func createReplicationInfoMap(info *ReplicationInfo) map[string]interface{} { + rInfoMap := make(map[string]interface{}) + rInfoMap["version"] = info.Version + rInfoMap["last_event_id"] = info.LastEventID + + return rInfoMap +} + func isTimeoutError(err error) bool { if err == gocql.ErrTimeoutNoResponse { return true diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index 2858a3700b9..6920f0f0daf 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -1287,6 +1287,284 @@ func (s *cassandraPersistenceSuite) TestContinueAsNew() { s.Equal(*newWorkflowExecution.RunId, newRunID) } +func (s *cassandraPersistenceSuite) TestReplicationTransferTaskTasks() { + domainID := "2466d7de-6602-4ad8-b939-fb8f8c36c711" + workflowExecution := gen.WorkflowExecution{ + WorkflowId: common.StringPtr("replication-transfer-task-test"), + RunId: common.StringPtr("dcde9d85-5d7a-43c7-8b18-cb2cae0e29e0"), + } + + task0, err := s.CreateWorkflowExecution(domainID, workflowExecution, "queue1", "wType", 20, 13, nil, 3, 0, 2, nil) + s.Nil(err, "No error expected.") + s.NotEmpty(task0, "Expected non empty task identifier.") + + taskD, err := s.GetTransferTasks(1) + s.Equal(1, len(taskD), "Expected 1 decision task.") + err = s.CompleteTransferTask(taskD[0].TaskID) + s.Nil(err) + + state1, err := s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err, "No error expected.") + info1 := state1.ExecutionInfo + s.NotNil(info1, "Valid Workflow info expected.") + updatedInfo1 := copyWorkflowExecutionInfo(info1) + + transferTasks := []Task{&ReplicationTask{ + TaskID: s.GetNextSequenceNumber(), + FirstEventID: int64(1), + NextEventID: int64(3), + Version: int64(9), + LastReplicationInfo: map[string]*ReplicationInfo{ + "dc1": &ReplicationInfo{ + Version: int64(3), + LastEventID: int64(1), + }, + "dc2": &ReplicationInfo{ + Version: int64(5), + LastEventID: int64(2), + }, + }, + }} + err = s.UpdateWorkflowExecutionWithTransferTasks(updatedInfo1, int64(3), transferTasks, nil) + s.Nil(err, "No error expected.") + + tasks1, err := s.GetTransferTasks(1) + s.Nil(err, "No error expected.") + s.NotNil(tasks1, "expected valid list of tasks.") + s.Equal(1, len(tasks1), "Expected 1 replication task.") + task1 := tasks1[0] + s.Equal(TransferTaskTypeReplicationTask, task1.TaskType) + s.Equal(domainID, task1.DomainID) + s.Equal(*workflowExecution.WorkflowId, task1.WorkflowID) + s.Equal(*workflowExecution.RunId, task1.RunID) + s.Equal(int64(1), task1.FirstEventID) + s.Equal(int64(3), task1.NextEventID) + s.Equal(int64(9), task1.Version) + s.Equal(2, len(task1.LastReplicationInfo)) + for k, v := range task1.LastReplicationInfo { + log.Infof("ReplicationInfo for %v: {Version: %v, LastEventID: %v}", k, v.Version, v.LastEventID) + switch k { + case "dc1": + s.Equal(int64(3), v.Version) + s.Equal(int64(1), v.LastEventID) + case "dc2": + s.Equal(int64(5), v.Version) + s.Equal(int64(2), v.LastEventID) + default: + s.Fail("Unexpected key") + } + } + + err = s.CompleteTransferTask(task1.TaskID) + s.Nil(err) +} + +func (s *cassandraPersistenceSuite) TestWorkflowReplicationState() { + domainID := uuid.New() + runID := uuid.New() + workflowExecution := gen.WorkflowExecution{ + WorkflowId: common.StringPtr("test-workflow-replication-state-test"), + RunId: common.StringPtr(runID), + } + + replicationTasks := []Task{&ReplicationTask{ + TaskID: s.GetNextSequenceNumber(), + FirstEventID: int64(1), + NextEventID: int64(3), + Version: int64(9), + LastReplicationInfo: map[string]*ReplicationInfo{ + "dc1": &ReplicationInfo{ + Version: int64(3), + LastEventID: int64(1), + }, + "dc2": &ReplicationInfo{ + Version: int64(5), + LastEventID: int64(2), + }, + }, + }} + + task0, err0 := s.CreateWorkflowExecutionWithReplication(domainID, workflowExecution, "taskList", "wType", 20, 13, 3, + 0, 2, &ReplicationState{ + CurrentVersion: int64(9), + StartVersion: int64(8), + LastWriteVersion: int64(7), + LastWriteEventID: int64(6), + LastReplicationInfo: map[string]*ReplicationInfo{ + "dc1": { + Version: int64(3), + LastEventID: int64(1), + }, + "dc2": { + Version: int64(5), + LastEventID: int64(2), + }, + }, + }, replicationTasks) + s.Nil(err0, "No error expected.") + s.NotEmpty(task0, "Expected non empty task identifier.") + + taskD, err := s.GetTransferTasks(2) + s.Equal(2, len(taskD), "Expected 1 decision task.") + for _, tsk := range taskD { + switch tsk.TaskType { + case TransferTaskTypeDecisionTask: + err = s.CompleteTransferTask(taskD[0].TaskID) + s.Nil(err) + case TransferTaskTypeReplicationTask: + s.Equal(domainID, tsk.DomainID) + s.Equal(*workflowExecution.WorkflowId, tsk.WorkflowID) + s.Equal(*workflowExecution.RunId, tsk.RunID) + s.Equal(int64(1), tsk.FirstEventID) + s.Equal(int64(3), tsk.NextEventID) + s.Equal(int64(9), tsk.Version) + s.Equal(2, len(tsk.LastReplicationInfo)) + for k, v := range tsk.LastReplicationInfo { + log.Infof("ReplicationInfo for %v: {Version: %v, LastEventID: %v}", k, v.Version, v.LastEventID) + switch k { + case "dc1": + s.Equal(int64(3), v.Version) + s.Equal(int64(1), v.LastEventID) + case "dc2": + s.Equal(int64(5), v.Version) + s.Equal(int64(2), v.LastEventID) + default: + s.Fail("Unexpected key") + } + } + err = s.CompleteTransferTask(taskD[0].TaskID) + s.Nil(err) + } + } + + state0, err1 := s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err1, "No error expected.") + info0 := state0.ExecutionInfo + replicationState0 := state0.ReplicationState + s.NotNil(info0, "Valid Workflow info expected.") + s.Equal(domainID, info0.DomainID) + s.Equal("taskList", info0.TaskList) + s.Equal("wType", info0.WorkflowTypeName) + s.Equal(int32(20), info0.WorkflowTimeout) + s.Equal(int32(13), info0.DecisionTimeoutValue) + s.Equal(int64(3), info0.NextEventID) + s.Equal(int64(0), info0.LastProcessedEvent) + s.Equal(int64(2), info0.DecisionScheduleID) + s.Equal(int64(9), replicationState0.CurrentVersion) + s.Equal(int64(8), replicationState0.StartVersion) + s.Equal(int64(7), replicationState0.LastWriteVersion) + s.Equal(int64(6), replicationState0.LastWriteEventID) + s.Equal(2, len(replicationState0.LastReplicationInfo)) + for k, v := range replicationState0.LastReplicationInfo { + log.Infof("ReplicationInfo for %v: {Version: %v, LastEventID: %v}", k, v.Version, v.LastEventID) + switch k { + case "dc1": + s.Equal(int64(3), v.Version) + s.Equal(int64(1), v.LastEventID) + case "dc2": + s.Equal(int64(5), v.Version) + s.Equal(int64(2), v.LastEventID) + default: + s.Fail("Unexpected key") + } + } + + updatedInfo := copyWorkflowExecutionInfo(info0) + updatedInfo.NextEventID = int64(5) + updatedInfo.LastProcessedEvent = int64(2) + updatedReplicationState := copyReplicationState(replicationState0) + updatedReplicationState.CurrentVersion = int64(10) + updatedReplicationState.StartVersion = int64(11) + updatedReplicationState.LastWriteVersion = int64(12) + updatedReplicationState.LastWriteEventID = int64(13) + updatedReplicationState.LastReplicationInfo["dc1"].Version = int64(4) + updatedReplicationState.LastReplicationInfo["dc1"].LastEventID = int64(2) + + replicationTasks1 := []Task{&ReplicationTask{ + TaskID: s.GetNextSequenceNumber(), + FirstEventID: int64(3), + NextEventID: int64(5), + Version: int64(10), + LastReplicationInfo: map[string]*ReplicationInfo{ + "dc1": &ReplicationInfo{ + Version: int64(4), + LastEventID: int64(2), + }, + "dc2": &ReplicationInfo{ + Version: int64(5), + LastEventID: int64(2), + }, + }, + }} + err2 := s.UpdateWorklowStateAndReplication(updatedInfo, updatedReplicationState, int64(3), replicationTasks1) + s.Nil(err2, "No error expected.") + + taskD1, err := s.GetTransferTasks(2) + s.Equal(1, len(taskD1), "Expected 1 decision task.") + for _, tsk := range taskD1 { + switch tsk.TaskType { + case TransferTaskTypeDecisionTask: + err = s.CompleteTransferTask(taskD1[0].TaskID) + s.Nil(err) + case TransferTaskTypeReplicationTask: + s.Equal(domainID, tsk.DomainID) + s.Equal(*workflowExecution.WorkflowId, tsk.WorkflowID) + s.Equal(*workflowExecution.RunId, tsk.RunID) + s.Equal(int64(3), tsk.FirstEventID) + s.Equal(int64(5), tsk.NextEventID) + s.Equal(int64(10), tsk.Version) + s.Equal(2, len(tsk.LastReplicationInfo)) + for k, v := range tsk.LastReplicationInfo { + log.Infof("ReplicationInfo for %v: {Version: %v, LastEventID: %v}", k, v.Version, v.LastEventID) + switch k { + case "dc1": + s.Equal(int64(4), v.Version) + s.Equal(int64(2), v.LastEventID) + case "dc2": + s.Equal(int64(5), v.Version) + s.Equal(int64(2), v.LastEventID) + default: + s.Fail("Unexpected key") + } + } + err = s.CompleteTransferTask(taskD1[0].TaskID) + s.Nil(err) + } + } + + state1, err2 := s.GetWorkflowExecutionInfo(domainID, workflowExecution) + s.Nil(err2, "No error expected.") + info1 := state1.ExecutionInfo + replicationState1 := state1.ReplicationState + s.NotNil(info1, "Valid Workflow info expected.") + s.Equal(domainID, info1.DomainID) + s.Equal("taskList", info1.TaskList) + s.Equal("wType", info1.WorkflowTypeName) + s.Equal(int32(20), info1.WorkflowTimeout) + s.Equal(int32(13), info1.DecisionTimeoutValue) + s.Equal(int64(5), info1.NextEventID) + s.Equal(int64(2), info1.LastProcessedEvent) + s.Equal(int64(2), info1.DecisionScheduleID) + s.Equal(int64(10), replicationState1.CurrentVersion) + s.Equal(int64(11), replicationState1.StartVersion) + s.Equal(int64(12), replicationState1.LastWriteVersion) + s.Equal(int64(13), replicationState1.LastWriteEventID) + s.Equal(2, len(replicationState1.LastReplicationInfo)) + for k, v := range replicationState1.LastReplicationInfo { + log.Infof("ReplicationInfo for %v: {Version: %v, LastEventID: %v}", k, v.Version, v.LastEventID) + switch k { + case "dc1": + s.Equal(int64(4), v.Version) + s.Equal(int64(2), v.LastEventID) + case "dc2": + s.Equal(int64(5), v.Version) + s.Equal(int64(2), v.LastEventID) + default: + s.Fail("Unexpected key") + } + } +} + func copyWorkflowExecutionInfo(sourceInfo *WorkflowExecutionInfo) *WorkflowExecutionInfo { return &WorkflowExecutionInfo{ DomainID: sourceInfo.DomainID, @@ -1313,3 +1591,27 @@ func copyWorkflowExecutionInfo(sourceInfo *WorkflowExecutionInfo) *WorkflowExecu DecisionTimeout: sourceInfo.DecisionTimeout, } } + +func copyReplicationState(sourceState *ReplicationState) *ReplicationState { + state := &ReplicationState{ + CurrentVersion: sourceState.CurrentVersion, + StartVersion: sourceState.StartVersion, + LastWriteVersion: sourceState.LastWriteVersion, + LastWriteEventID: sourceState.LastWriteEventID, + } + if sourceState.LastReplicationInfo != nil { + state.LastReplicationInfo = map[string]*ReplicationInfo{} + for k, v := range sourceState.LastReplicationInfo { + state.LastReplicationInfo[k] = copyReplicationInfo(v) + } + } + + return state +} + +func copyReplicationInfo(sourceInfo *ReplicationInfo) *ReplicationInfo { + return &ReplicationInfo{ + Version: sourceInfo.Version, + LastEventID: sourceInfo.LastEventID, + } +} diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 68e96257350..8a66e3c24aa 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -73,6 +73,7 @@ const ( TransferTaskTypeCancelExecution TransferTaskTypeStartChildExecution TransferTaskTypeSignalExecution + TransferTaskTypeReplicationTask ) // Types of timers @@ -164,6 +165,16 @@ type ( ClientImpl string } + // ReplicationState represents mutable state information for global domains. + // This information is used by replication protocol when applying events from remote clusters + ReplicationState struct { + CurrentVersion int64 + StartVersion int64 + LastWriteVersion int64 + LastWriteEventID int64 + LastReplicationInfo map[string]*ReplicationInfo + } + // TransferTaskInfo describes a transfer task TransferTaskInfo struct { DomainID string @@ -177,6 +188,10 @@ type ( TaskList string TaskType int ScheduleID int64 + FirstEventID int64 + NextEventID int64 + Version int64 + LastReplicationInfo map[string]*ReplicationInfo } // TimerTaskInfo describes a timer task. @@ -304,6 +319,21 @@ type ( EventID int64 } + // ReplicationTask is the transfer task created for shipping history replication events to other clusters + ReplicationTask struct { + TaskID int64 + FirstEventID int64 + NextEventID int64 + Version int64 + LastReplicationInfo map[string]*ReplicationInfo + } + + // ReplicationInfo represents the information stored for last replication event details per cluster + ReplicationInfo struct { + Version int64 + LastEventID int64 + } + // WorkflowMutableState indicates workflow related state WorkflowMutableState struct { ActivitInfos map[int64]*ActivityInfo @@ -313,6 +343,7 @@ type ( SignalInfos map[int64]*SignalInfo SignalRequestedIDs map[string]struct{} ExecutionInfo *WorkflowExecutionInfo + ReplicationState *ReplicationState BufferedEvents []*SerializedHistoryEventBatch } @@ -414,6 +445,7 @@ type ( ContinueAsNew bool PreviousRunID string ExecutionInfo *WorkflowExecutionInfo + ReplicationState *ReplicationState } // CreateWorkflowExecutionResponse is the response to CreateWorkflowExecutionRequest @@ -449,6 +481,7 @@ type ( // UpdateWorkflowExecutionRequest is used to update a workflow execution UpdateWorkflowExecutionRequest struct { ExecutionInfo *WorkflowExecutionInfo + ReplicationState *ReplicationState TransferTasks []Task TimerTasks []Task DeleteTimerTask Task @@ -1001,6 +1034,21 @@ func (u *StartChildExecutionTask) SetTaskID(id int64) { u.TaskID = id } +// GetType returns the type of the activity task +func (a *ReplicationTask) GetType() int { + return TransferTaskTypeReplicationTask +} + +// GetTaskID returns the sequence ID of the activity task +func (a *ReplicationTask) GetTaskID() int64 { + return a.TaskID +} + +// SetTaskID sets the sequence ID of the activity task +func (a *ReplicationTask) SetTaskID(id int64) { + a.TaskID = id +} + // NewHistoryEventBatch returns a new instance of HistoryEventBatch func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch { return &HistoryEventBatch{ diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index fd1c7dce486..e3667313ea2 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -262,6 +262,39 @@ func (s *TestBase) CreateWorkflowExecution(domainID string, workflowExecution wo return response, err } +// CreateWorkflowExecution is a utility method to create workflow executions +func (s *TestBase) CreateWorkflowExecutionWithReplication(domainID string, workflowExecution workflow.WorkflowExecution, + taskList, wType string, wTimeout int32, decisionTimeout int32, nextEventID int64, + lastProcessedEventID int64, decisionScheduleID int64, state *ReplicationState, txTasks []Task) ( + *CreateWorkflowExecutionResponse, error) { + transferTasks := txTasks + transferTasks = append(transferTasks, &DecisionTask{ + TaskID: s.GetNextSequenceNumber(), + DomainID: domainID, + TaskList: taskList, + ScheduleID: decisionScheduleID, + }) + response, err := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ + RequestID: uuid.New(), + DomainID: domainID, + Execution: workflowExecution, + TaskList: taskList, + WorkflowTypeName: wType, + WorkflowTimeout: wTimeout, + DecisionTimeoutValue: decisionTimeout, + NextEventID: nextEventID, + LastProcessedEvent: lastProcessedEventID, + RangeID: s.ShardInfo.RangeID, + TransferTasks: transferTasks, + DecisionScheduleID: decisionScheduleID, + DecisionStartedID: common.EmptyEventID, + DecisionStartToCloseTimeout: 1, + ReplicationState: state, + }) + + return response, err +} + // CreateWorkflowExecutionManyTasks is a utility method to create workflow executions func (s *TestBase) CreateWorkflowExecutionManyTasks(domainID string, workflowExecution workflow.WorkflowExecution, taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64, @@ -517,6 +550,13 @@ func (s *TestBase) DeleteSignalsRequestedState(updatedInfo *WorkflowExecutionInf nil, nil, nil, deleteSignalsRequestedID) } +// DeleteSignalsRequestedState is a utility method to delete mutable state of workflow execution +func (s *TestBase) UpdateWorklowStateAndReplication(updatedInfo *WorkflowExecutionInfo, + updatedReplicationState *ReplicationState, condition int64, txTasks []Task) error { + return s.UpdateWorkflowExecutionWithReplication(updatedInfo, updatedReplicationState, nil, nil, + s.ShardInfo.RangeID, condition, nil, txTasks, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "") +} + // UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, @@ -525,7 +565,21 @@ func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecu upsertCancelInfos []*RequestCancelInfo, deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string, deleteSignalRequestedID string) error { - transferTasks := []Task{} + return s.UpdateWorkflowExecutionWithReplication(updatedInfo, nil, decisionScheduleIDs, activityScheduleIDs, rangeID, + condition, timerTasks, []Task{}, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, upsertTimerInfos, deleteTimerInfos, + upsertChildInfos, deleteChildInfo, upsertCancelInfos, deleteCancelInfo, upsertSignalInfos, deleteSignalInfo, + upsertSignalRequestedIDs, deleteSignalRequestedID) +} + +// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution +func (s *TestBase) UpdateWorkflowExecutionWithReplication(updatedInfo *WorkflowExecutionInfo, + updatedReplicationState *ReplicationState, decisionScheduleIDs []int64, activityScheduleIDs []int64, rangeID, + condition int64, timerTasks []Task, txTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, + deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string, + upsertChildInfos []*ChildExecutionInfo, deleteChildInfo *int64, upsertCancelInfos []*RequestCancelInfo, + deleteCancelInfo *int64, upsertSignalInfos []*SignalInfo, deleteSignalInfo *int64, upsertSignalRequestedIDs []string, + deleteSignalRequestedID string) error { + transferTasks := txTasks for _, decisionScheduleID := range decisionScheduleIDs { transferTasks = append(transferTasks, &DecisionTask{ TaskID: s.GetNextSequenceNumber(), @@ -544,6 +598,7 @@ func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecu return s.WorkflowMgr.UpdateWorkflowExecution(&UpdateWorkflowExecutionRequest{ ExecutionInfo: updatedInfo, + ReplicationState: updatedReplicationState, TransferTasks: transferTasks, TimerTasks: timerTasks, Condition: condition, diff --git a/idl/github.com/uber/cadence/replicator.thrift b/idl/github.com/uber/cadence/replicator.thrift index f0204ade321..0dba8b6612e 100644 --- a/idl/github.com/uber/cadence/replicator.thrift +++ b/idl/github.com/uber/cadence/replicator.thrift @@ -43,6 +43,12 @@ struct DomainTaskAttributes { } struct HistoryTaskAttributes { + 10: optional string domainId + 20: optional string workflowId + 30: optional string runId + 40: optional i64 (js.type = "Long") firstEventId + 50: optional i64 (js.type = "Long") nextEventId + 60: optional i64 (js.type = "Long") version } struct ReplicationTask { diff --git a/schema/cadence/schema.cql b/schema/cadence/schema.cql index e8aaa511cb4..872ef6a4f42 100644 --- a/schema/cadence/schema.cql +++ b/schema/cadence/schema.cql @@ -46,7 +46,22 @@ CREATE TYPE workflow_execution ( sticky_schedule_to_start_timeout int, client_library_version text, client_feature_version text, - client_impl text, + client_impl text, +); + +-- Replication information for each cluster +CREATE TYPE replication_info ( + version bigint, + last_event_id bigint, +); + +-- This is used to store replication information for a workflow execution +CREATE TYPE replication_state ( + current_version bigint, -- current version for domain, incremented on failover + start_version bigint, -- version of domain when the workflow execution was started + last_write_version bigint, -- version of domain when the last event was written to history + last_write_event_id bigint, -- last written event id for a given version + last_replication_info map>, -- information about replication events from other clusters ); -- TODO: Remove fields that are left over from activity and workflow tasks. @@ -60,8 +75,12 @@ CREATE TYPE transfer_task ( target_run_id uuid, -- The external run ID that this transfer task is doing work for. target_child_workflow_only boolean, -- The whether target child workflow only. task_list text, - type int, -- enum TaskType {ActivityTask, DecisionTask, DeleteExecution, CancelExecution, StartChildExecution} + type int, -- enum TaskType {ActivityTask, DecisionTask, DeleteExecution, CancelExecution, StartChildExecution, ReplicationTask} schedule_id bigint, + first_event_id bigint, -- Used by ReplicationTask to set the first event ID of the applied transaction + next_event_id bigint, -- Used by ReplicationTask to set the next event ID of the applied transaction + version bigint, -- Used by ReplicationTask to set the failover version of the applied transaction + last_replication_info map>, -- Used by replication task to snapshot replication information when the transaction was applied ); CREATE TYPE timer_task ( @@ -195,6 +214,7 @@ CREATE TABLE executions ( signal_map map>, signal_requested set, buffered_events_list list>, + replication_state frozen, -- Replication information part of mutable state PRIMARY KEY (shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id) ) WITH COMPACTION = { 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy' diff --git a/schema/cadence/versioned/v0.6/history_replication_task.cql b/schema/cadence/versioned/v0.6/history_replication_task.cql new file mode 100644 index 00000000000..b602a9babdb --- /dev/null +++ b/schema/cadence/versioned/v0.6/history_replication_task.cql @@ -0,0 +1,22 @@ +-- Replication information for each cluster +CREATE TYPE replication_info ( + version bigint, + last_event_id bigint, +); + +-- This is used to store replication information for a workflow execution +CREATE TYPE replication_state ( + current_version bigint, -- current version for domain, incremented on failover + start_version bigint, -- version of domain when the workflow execution was started + last_write_version bigint, -- version of domain when the last event was written to history + last_write_event_id bigint, -- last written event id for a given version + last_replication_info map>, -- information about replication events from other clusters +); + +-- Replication information part of mutable state +ALTER TABLE executions ADD replication_state frozen; + +ALTER TYPE transfer_task ADD first_event_id bigint; +ALTER TYPE transfer_task ADD next_event_id bigint; +ALTER TYPE transfer_task ADD version bigint; +ALTER TYPE transfer_task ADD last_replication_info map>; \ No newline at end of file diff --git a/schema/cadence/versioned/v0.6/manifest.json b/schema/cadence/versioned/v0.6/manifest.json new file mode 100644 index 00000000000..efc7512d227 --- /dev/null +++ b/schema/cadence/versioned/v0.6/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.6", + "MinCompatibleVersion": "0.6", + "Description": "Persistence support for replication of workflow execution", + "SchemaUpdateCqlFiles": [ + "history_replication_task.cql" + ] +} \ No newline at end of file diff --git a/tools/cassandra/updateTask_test.go b/tools/cassandra/updateTask_test.go index 12a1dafdd8a..0eba412f876 100644 --- a/tools/cassandra/updateTask_test.go +++ b/tools/cassandra/updateTask_test.go @@ -130,7 +130,7 @@ func (s *UpdateSchemaTestSuite) TestDryrun() { s.Nil(err) // update the version to the latest s.log.Infof("Ver: %v", ver) - s.Equal(0, cmpVersion(ver, "0.5")) + s.Equal(0, cmpVersion(ver, "0.6")) dropAllTablesTypes(client) }