From 9a0c282bd57078e7b701581b571ed2352a6ccce5 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 21 May 2020 10:18:22 -0700 Subject: [PATCH 1/5] Failover marker persistence --- common/constants.go | 2 + common/metrics/defs.go | 3 ++ common/mocks/ExecutionManager.go | 14 +++++++ .../cassandra/cassandraPersistence.go | 22 +++++++++++ .../cassandra/cassandraPersistenceUtil.go | 5 +++ common/persistence/dataInterfaces.go | 9 ++++- common/persistence/executionStore.go | 6 +++ .../persistence-tests/executionManagerTest.go | 23 +++++++++++ .../persistence-tests/persistenceTestBase.go | 10 +++++ common/persistence/persistenceInterface.go | 1 + .../persistence/persistenceMetricClients.go | 14 +++++++ .../persistenceRateLimitedClients.go | 9 +++++ common/persistence/sql/sqlExecutionManager.go | 39 +++++++++++++++++++ .../sql/sqlExecutionManagerUtil.go | 7 +++- 14 files changed, 161 insertions(+), 3 deletions(-) diff --git a/common/constants.go b/common/constants.go index bc240e52568..e3a4229ce5a 100644 --- a/common/constants.go +++ b/common/constants.go @@ -52,6 +52,8 @@ const ( const ( // EmptyUUID is the placeholder for UUID when it's empty EmptyUUID = "emptyUuid" + // EmptyReplicationUUID is the placeholder for replication task UUID when it's empty + EmptyReplicationUUID = "30000000-5000-f000-f000-000000000000" ) const ( diff --git a/common/metrics/defs.go b/common/metrics/defs.go index b2be78730c5..6411bcf3b09 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -170,6 +170,8 @@ const ( PersistenceDeleteReplicationTaskFromDLQScope // PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer PersistenceRangeDeleteReplicationTaskFromDLQScope + // PersistenceCreateFailoverMakerTaskScope tracks CreateFailoverMakerTasks calls made by service to persistence layer + PersistenceCreateFailoverMakerTasksScope // PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer PersistenceGetTimerIndexTasksScope // PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer @@ -1067,6 +1069,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"}, PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"}, PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"}, + PersistenceCreateFailoverMakerTasksScope: {operation: "CreateFailoverMarkerTasks"}, PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"}, PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"}, PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"}, diff --git a/common/mocks/ExecutionManager.go b/common/mocks/ExecutionManager.go index 1fc089b04ce..7e2bbc8d138 100644 --- a/common/mocks/ExecutionManager.go +++ b/common/mocks/ExecutionManager.go @@ -453,6 +453,20 @@ func (_m *ExecutionManager) RangeCompleteTimerTask(request *persistence.RangeCom return r0 } +// CreateFailoverMarkerTasks provides a mock function with given fields: request +func (_m *ExecutionManager) CreateFailoverMarkerTasks(request *persistence.CreateFailoverMarkersRequest) error { + ret := _m.Called(request) + + var r0 error + if rf, ok := ret.Get(0).(func(*persistence.CreateFailoverMarkersRequest) error); ok { + r0 = rf(request) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Close provides a mock function with given fields: func (_m *ExecutionManager) Close() { _m.Called() diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index 2a0594bf941..f33541b8e7f 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -2892,3 +2892,25 @@ func (d *cassandraPersistence) RangeDeleteReplicationTaskFromDLQ( } return nil } + +func (d *cassandraPersistence) CreateFailoverMarkerTasks( + request *p.CreateFailoverMarkersRequest, +) error { + + batch := d.session.NewBatch(gocql.LoggedBatch) + for _, task := range request.Markers { + t := []p.Task{task} + if err := createReplicationTasks( + batch, + t, + d.shardID, + task.DomainID, + rowTypeReplicationWorkflowID, + rowTypeReplicationRunID, + ); err != nil { + return err + } + } + + return d.session.ExecuteBatch(batch) +} diff --git a/common/persistence/cassandra/cassandraPersistenceUtil.go b/common/persistence/cassandra/cassandraPersistenceUtil.go index 92a12a6a0a4..6014b2dcd5d 100644 --- a/common/persistence/cassandra/cassandraPersistenceUtil.go +++ b/common/persistence/cassandra/cassandraPersistenceUtil.go @@ -1086,6 +1086,11 @@ func createReplicationTasks( // cassandra does not like null lastReplicationInfo = make(map[string]map[string]interface{}) + case p.ReplicationTaskTypeFailoverMarker: + version = task.GetVersion() + // Failover marker uses firstEventID to store visibility timestamp + firstEventID = task.GetVisibilityTimestamp().UnixNano() + default: return &workflow.InternalServiceError{ Message: fmt.Sprintf("Unknow replication type: %v", task.GetType()), diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index a70401364e1..1ebf3f5cf52 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -609,9 +609,8 @@ type ( // FailoverMarkerTask is the marker for graceful failover FailoverMarkerTask struct { TaskID int64 - VisibilityTimestamp time.Time + VisibilityTimestamp time.Time // Visibility timestamp stores in the field scheduleEventID Version int64 - SourceCluster string DomainID string } @@ -1474,6 +1473,11 @@ type ( Branches []HistoryBranchDetail } + // CreateFailoverMarkersRequest is request to create failover markers + CreateFailoverMarkersRequest struct { + Markers []*FailoverMarkerTask + } + // Closeable is an interface for any entity that supports a close operation to release resources Closeable interface { Close() @@ -1516,6 +1520,7 @@ type ( GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error) DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error + CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index a7f00517086..8a4f613b294 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -868,6 +868,12 @@ func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ( return m.persistence.RangeDeleteReplicationTaskFromDLQ(request) } +func (m *executionManagerImpl) CreateFailoverMarkerTasks( + request *CreateFailoverMarkersRequest, +) error { + return m.persistence.CreateFailoverMarkerTasks(request) +} + // Timer related methods. func (m *executionManagerImpl) GetTimerIndexTasks( request *GetTimerIndexTasksRequest, diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index 19aa729f355..94bcc93ade3 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -5280,6 +5280,29 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() { s.Len(resp.Tasks, 0) } +// TestCreateFailoverMarkerTasks test +func (s *ExecutionManagerSuite) TestCreateFailoverMarkerTasks() { + domainID := uuid.New() + markers := []*p.FailoverMarkerTask{ + { + TaskID: 1, + VisibilityTimestamp: time.Now(), + DomainID: domainID, + Version: 1, + }, + } + err := s.CreateFailoverMarkers(markers) + s.NoError(err) + + tasks, err := s.GetReplicationTasks(1, true) + s.NoError(err) + s.Equal(len(tasks), 1) + s.Equal(tasks[0].Version, int64(1)) + s.Equal(tasks[0].TaskID, int64(1)) + s.Equal(tasks[0].DomainID, domainID) + s.Equal(tasks[0].TaskType, p.ReplicationTaskTypeFailoverMarker) +} + func copyWorkflowExecutionInfo(sourceInfo *p.WorkflowExecutionInfo) *p.WorkflowExecutionInfo { return &p.WorkflowExecutionInfo{ DomainID: sourceInfo.DomainID, diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index ea35b6628a0..919f07080c3 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -1151,6 +1151,16 @@ func (s *TestBase) RangeDeleteReplicationTaskFromDLQ( }) } +// CreateFailoverMarkers is a utility method to create failover markers +func (s *TestBase) CreateFailoverMarkers( + markers []*p.FailoverMarkerTask, +) error { + + return s.ExecutionManager.CreateFailoverMarkerTasks(&p.CreateFailoverMarkersRequest{ + Markers: markers, + }) +} + // CompleteTransferTask is a utility method to complete a transfer task func (s *TestBase) CompleteTransferTask(taskID int64) error { diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 57816f28ff6..277d3777b53 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -83,6 +83,7 @@ type ( GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error) DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error + CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index 15e4ab67d06..e869498fef7 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -492,6 +492,20 @@ func (p *workflowExecutionPersistenceClient) RangeDeleteReplicationTaskFromDLQ( return nil } +func (p *workflowExecutionPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error { + p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceRequests) + + sw := p.metricClient.StartTimer(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceLatency) + err := p.persistence.CreateFailoverMarkerTasks(request) + sw.Stop() + + if err != nil { + p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceFailures) + } + + return err +} + func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) { p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests) diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index 7fc14f3891b..c5895ef2ea6 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -366,6 +366,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) RangeDeleteReplicationTa return p.persistence.RangeDeleteReplicationTaskFromDLQ(request) } +func (p *workflowExecutionRateLimitedPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error { + if ok := p.rateLimiter.Allow(); !ok { + return ErrPersistenceLimitExceeded + } + + err := p.persistence.CreateFailoverMarkerTasks(request) + return err +} + func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) { if ok := p.rateLimiter.Allow(); !ok { return nil, ErrPersistenceLimitExceeded diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 9dd53c6d795..5b5ea91ec5e 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -28,6 +28,8 @@ import ( "math" "time" + "github.com/uber/cadence/common/log/tag" + workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" @@ -1095,6 +1097,43 @@ func (m *sqlExecutionManager) RangeDeleteReplicationTaskFromDLQ( return nil } +func (m *sqlExecutionManager) CreateFailoverMarkerTasks( + request *p.CreateFailoverMarkersRequest, +) error { + + tx, err := m.db.BeginTx() + if err != nil { + return err + } + + for _, task := range request.Markers { + t := []p.Task{task} + if err := createReplicationTasks( + tx, + t, + m.shardID, + sqlplugin.MustParseUUID(task.DomainID), + common.EmptyReplicationUUID, + sqlplugin.MustParseUUID(common.EmptyReplicationUUID), + ); err != nil { + rollBackErr := tx.Rollback() + if rollBackErr != nil { + m.logger.Error("transaction rollback error", tag.Error(rollBackErr)) + } + + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("%v: %v", "CreateFailoverMarkerTasks", err), + } + } + } + if err := tx.Commit(); err != nil { + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("%s operation failed. Failed to commit transaction. Error: %v", "CreateFailoverMarkerTasks", err), + } + } + return nil +} + type timerTaskPageToken struct { TaskID int64 Timestamp time.Time diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 3cab69aa93a..20385a8501b 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -851,8 +851,8 @@ func createReplicationTasks( nextEventID := common.EmptyEventID version := common.EmptyVersion activityScheduleID := common.EmptyEventID - var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo + var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo var branchToken, newRunBranchToken []byte var resetWorkflow bool @@ -880,6 +880,11 @@ func createReplicationTasks( activityScheduleID = task.(*p.SyncActivityTask).ScheduledID lastReplicationInfo = map[string]*sqlblobs.ReplicationInfo{} + case p.ReplicationTaskTypeFailoverMarker: + version = task.GetVersion() + // Failover marker uses firstEventID to store visibility timestamp + firstEventID = task.GetVisibilityTimestamp().UnixNano() + default: return &workflow.InternalServiceError{ Message: fmt.Sprintf("Unknown replication task: %v", task.GetType()), From 7da47f1af67cd67a8e0e1c09a3469de9a5108021 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 21 May 2020 16:27:49 -0700 Subject: [PATCH 2/5] Update db schema to have creation time --- .gen/go/sqlblobs/sqlblobs.go | 52 +++++++++++++++++-- .../cassandra/cassandraPersistence.go | 5 +- .../cassandra/cassandraPersistenceUtil.go | 10 +++- common/persistence/dataInterfaces.go | 3 +- common/persistence/sql/sqlExecutionManager.go | 2 + .../sql/sqlExecutionManagerUtil.go | 17 ++++-- schema/cassandra/cadence/schema.cql | 1 + .../cadence/versioned/v0.28/manifest.json | 8 +++ .../v0.28/replication_task_creation_time.cql | 1 + 9 files changed, 86 insertions(+), 13 deletions(-) create mode 100644 schema/cassandra/cadence/versioned/v0.28/manifest.json create mode 100644 schema/cassandra/cadence/versioned/v0.28/replication_task_creation_time.cql diff --git a/.gen/go/sqlblobs/sqlblobs.go b/.gen/go/sqlblobs/sqlblobs.go index 484041ac1e3..816cb0b6489 100644 --- a/.gen/go/sqlblobs/sqlblobs.go +++ b/.gen/go/sqlblobs/sqlblobs.go @@ -3924,6 +3924,7 @@ type ReplicationTaskInfo struct { LastReplicationInfo map[string]*ReplicationInfo `json:"lastReplicationInfo,omitempty"` NewRunBranchToken []byte `json:"newRunBranchToken,omitempty"` ResetWorkflow *bool `json:"resetWorkflow,omitempty"` + CreationTime *int64 `json:"creationTime,omitempty"` } type _Map_String_ReplicationInfo_MapItemList map[string]*ReplicationInfo @@ -3981,7 +3982,7 @@ func (_Map_String_ReplicationInfo_MapItemList) Close() {} // } func (v *ReplicationTaskInfo) ToWire() (wire.Value, error) { var ( - fields [14]wire.Field + fields [15]wire.Field i int = 0 w wire.Value err error @@ -4099,6 +4100,14 @@ func (v *ReplicationTaskInfo) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 36, Value: w} i++ } + if v.CreationTime != nil { + w, err = wire.NewValueI64(*(v.CreationTime)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 38, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -4288,6 +4297,16 @@ func (v *ReplicationTaskInfo) FromWire(w wire.Value) error { return err } + } + case 38: + if field.Value.Type() == wire.TI64 { + var x int64 + x, err = field.Value.GetI64(), error(nil) + v.CreationTime = &x + if err != nil { + return err + } + } } } @@ -4302,7 +4321,7 @@ func (v *ReplicationTaskInfo) String() string { return "" } - var fields [14]string + var fields [15]string i := 0 if v.DomainID != nil { fields[i] = fmt.Sprintf("DomainID: %v", v.DomainID) @@ -4360,6 +4379,10 @@ func (v *ReplicationTaskInfo) String() string { fields[i] = fmt.Sprintf("ResetWorkflow: %v", *(v.ResetWorkflow)) i++ } + if v.CreationTime != nil { + fields[i] = fmt.Sprintf("CreationTime: %v", *(v.CreationTime)) + i++ + } return fmt.Sprintf("ReplicationTaskInfo{%v}", strings.Join(fields[:i], ", ")) } @@ -4433,6 +4456,9 @@ func (v *ReplicationTaskInfo) Equals(rhs *ReplicationTaskInfo) bool { if !_Bool_EqualsPtr(v.ResetWorkflow, rhs.ResetWorkflow) { return false } + if !_I64_EqualsPtr(v.CreationTime, rhs.CreationTime) { + return false + } return true } @@ -4496,6 +4522,9 @@ func (v *ReplicationTaskInfo) MarshalLogObject(enc zapcore.ObjectEncoder) (err e if v.ResetWorkflow != nil { enc.AddBool("resetWorkflow", *v.ResetWorkflow) } + if v.CreationTime != nil { + enc.AddInt64("creationTime", *v.CreationTime) + } return err } @@ -4709,6 +4738,21 @@ func (v *ReplicationTaskInfo) IsSetResetWorkflow() bool { return v != nil && v.ResetWorkflow != nil } +// GetCreationTime returns the value of CreationTime if it is set or its +// zero value if it is unset. +func (v *ReplicationTaskInfo) GetCreationTime() (o int64) { + if v != nil && v.CreationTime != nil { + return *v.CreationTime + } + + return +} + +// IsSetCreationTime returns true if CreationTime is not nil. +func (v *ReplicationTaskInfo) IsSetCreationTime() bool { + return v != nil && v.CreationTime != nil +} + type RequestCancelInfo struct { Version *int64 `json:"version,omitempty"` InitiatedEventBatchID *int64 `json:"initiatedEventBatchID,omitempty"` @@ -10627,11 +10671,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "sqlblobs", Package: "github.com/uber/cadence/.gen/go/sqlblobs", FilePath: "sqlblobs.thrift", - SHA1: "4a05c49a415530d8204faa62f3d80927a402e4bf", + SHA1: "7f56c61bdb35121135c2b80e31a9a5a633bc78e6", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct ReplicationInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") lastEventID\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 40: optional i64 (js.type = \"Long\") currentVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 46: optional map lastReplicationInfo\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 30: optional string domainName\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 32: optional map lastReplicationInfo\n 34: optional binary newRunBranchToken\n 36: optional bool resetWorkflow\n}" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.sqlblobs\n\ninclude \"shared.thrift\"\n\nstruct ShardInfo {\n 10: optional i32 stolenSinceRenew\n 12: optional i64 (js.type = \"Long\") updatedAtNanos\n 14: optional i64 (js.type = \"Long\") replicationAckLevel\n 16: optional i64 (js.type = \"Long\") transferAckLevel\n 18: optional i64 (js.type = \"Long\") timerAckLevelNanos\n 24: optional i64 (js.type = \"Long\") domainNotificationVersion\n 34: optional map clusterTransferAckLevel\n 36: optional map clusterTimerAckLevel\n 38: optional string owner\n 40: optional map clusterReplicationLevel\n}\n\nstruct DomainInfo {\n 10: optional string name\n 12: optional string description\n 14: optional string owner\n 16: optional i32 status\n 18: optional i16 retentionDays\n 20: optional bool emitMetric\n 22: optional string archivalBucket\n 24: optional i16 archivalStatus\n 26: optional i64 (js.type = \"Long\") configVersion\n 28: optional i64 (js.type = \"Long\") notificationVersion\n 30: optional i64 (js.type = \"Long\") failoverNotificationVersion\n 32: optional i64 (js.type = \"Long\") failoverVersion\n 34: optional string activeClusterName\n 36: optional list clusters\n 38: optional map data\n 39: optional binary badBinaries\n 40: optional string badBinariesEncoding\n 42: optional i16 historyArchivalStatus\n 44: optional string historyArchivalURI\n 46: optional i16 visibilityArchivalStatus\n 48: optional string visibilityArchivalURI\n 50: optional i64 (js.type = \"Long\") failoverEndTime\n}\n\nstruct HistoryTreeInfo {\n 10: optional i64 (js.type = \"Long\") createdTimeNanos // For fork operation to prevent race condition of leaking event data when forking branches fail. Also can be used for clean up leaked data\n 12: optional list ancestors\n 14: optional string info // For lookup back to workflow during debugging, also background cleanup when fork operation cannot finish self cleanup due to crash.\n}\n\nstruct ReplicationInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") lastEventID\n}\n\nstruct WorkflowExecutionInfo {\n 10: optional binary parentDomainID\n 12: optional string parentWorkflowID\n 14: optional binary parentRunID\n 16: optional i64 (js.type = \"Long\") initiatedID\n 18: optional i64 (js.type = \"Long\") completionEventBatchID\n 20: optional binary completionEvent\n 22: optional string completionEventEncoding\n 24: optional string taskList\n 26: optional string workflowTypeName\n 28: optional i32 workflowTimeoutSeconds\n 30: optional i32 decisionTaskTimeoutSeconds\n 32: optional binary executionContext\n 34: optional i32 state\n 36: optional i32 closeStatus\n 38: optional i64 (js.type = \"Long\") startVersion\n 40: optional i64 (js.type = \"Long\") currentVersion\n 44: optional i64 (js.type = \"Long\") lastWriteEventID\n 46: optional map lastReplicationInfo\n 48: optional i64 (js.type = \"Long\") lastEventTaskID\n 50: optional i64 (js.type = \"Long\") lastFirstEventID\n 52: optional i64 (js.type = \"Long\") lastProcessedEvent\n 54: optional i64 (js.type = \"Long\") startTimeNanos\n 56: optional i64 (js.type = \"Long\") lastUpdatedTimeNanos\n 58: optional i64 (js.type = \"Long\") decisionVersion\n 60: optional i64 (js.type = \"Long\") decisionScheduleID\n 62: optional i64 (js.type = \"Long\") decisionStartedID\n 64: optional i32 decisionTimeout\n 66: optional i64 (js.type = \"Long\") decisionAttempt\n 68: optional i64 (js.type = \"Long\") decisionStartedTimestampNanos\n 69: optional i64 (js.type = \"Long\") decisionScheduledTimestampNanos\n 70: optional bool cancelRequested\n 71: optional i64 (js.type = \"Long\") decisionOriginalScheduledTimestampNanos\n 72: optional string createRequestID\n 74: optional string decisionRequestID\n 76: optional string cancelRequestID\n 78: optional string stickyTaskList\n 80: optional i64 (js.type = \"Long\") stickyScheduleToStartTimeout\n 82: optional i64 (js.type = \"Long\") retryAttempt\n 84: optional i32 retryInitialIntervalSeconds\n 86: optional i32 retryMaximumIntervalSeconds\n 88: optional i32 retryMaximumAttempts\n 90: optional i32 retryExpirationSeconds\n 92: optional double retryBackoffCoefficient\n 94: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 96: optional list retryNonRetryableErrors\n 98: optional bool hasRetryPolicy\n 100: optional string cronSchedule\n 102: optional i32 eventStoreVersion\n 104: optional binary eventBranchToken\n 106: optional i64 (js.type = \"Long\") signalCount\n 108: optional i64 (js.type = \"Long\") historySize\n 110: optional string clientLibraryVersion\n 112: optional string clientFeatureVersion\n 114: optional string clientImpl\n 115: optional binary autoResetPoints\n 116: optional string autoResetPointsEncoding\n 118: optional map searchAttributes\n 120: optional map memo\n 122: optional binary versionHistories\n 124: optional string versionHistoriesEncoding\n}\n\nstruct ActivityInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") scheduledEventBatchID\n 14: optional binary scheduledEvent\n 16: optional string scheduledEventEncoding\n 18: optional i64 (js.type = \"Long\") scheduledTimeNanos\n 20: optional i64 (js.type = \"Long\") startedID\n 22: optional binary startedEvent\n 24: optional string startedEventEncoding\n 26: optional i64 (js.type = \"Long\") startedTimeNanos\n 28: optional string activityID\n 30: optional string requestID\n 32: optional i32 scheduleToStartTimeoutSeconds\n 34: optional i32 scheduleToCloseTimeoutSeconds\n 36: optional i32 startToCloseTimeoutSeconds\n 38: optional i32 heartbeatTimeoutSeconds\n 40: optional bool cancelRequested\n 42: optional i64 (js.type = \"Long\") cancelRequestID\n 44: optional i32 timerTaskStatus\n 46: optional i32 attempt\n 48: optional string taskList\n 50: optional string startedIdentity\n 52: optional bool hasRetryPolicy\n 54: optional i32 retryInitialIntervalSeconds\n 56: optional i32 retryMaximumIntervalSeconds\n 58: optional i32 retryMaximumAttempts\n 60: optional i64 (js.type = \"Long\") retryExpirationTimeNanos\n 62: optional double retryBackoffCoefficient\n 64: optional list retryNonRetryableErrors\n 66: optional string retryLastFailureReason\n 68: optional string retryLastWorkerIdentity\n 70: optional binary retryLastFailureDetails\n}\n\nstruct ChildExecutionInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 14: optional i64 (js.type = \"Long\") startedID\n 16: optional binary initiatedEvent\n 18: optional string initiatedEventEncoding\n 20: optional string startedWorkflowID\n 22: optional binary startedRunID\n 24: optional binary startedEvent\n 26: optional string startedEventEncoding\n 28: optional string createRequestID\n 30: optional string domainName\n 32: optional string workflowTypeName\n 35: optional i32 parentClosePolicy\n}\n\nstruct SignalInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string requestID\n 14: optional string name\n 16: optional binary input\n 18: optional binary control\n}\n\nstruct RequestCancelInfo {\n 10: optional i64 (js.type = \"Long\") version\n 11: optional i64 (js.type = \"Long\") initiatedEventBatchID\n 12: optional string cancelRequestID\n}\n\nstruct TimerInfo {\n 10: optional i64 (js.type = \"Long\") version\n 12: optional i64 (js.type = \"Long\") startedID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n // TaskID is a misleading variable, it actually serves\n // the purpose of indicating whether a timer task is\n // generated for this timer info\n 16: optional i64 (js.type = \"Long\") taskID\n}\n\nstruct TaskInfo {\n 10: optional string workflowID\n 12: optional binary runID\n 13: optional i64 (js.type = \"Long\") scheduleID\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 15: optional i64 (js.type = \"Long\") createdTimeNanos\n}\n\nstruct TaskListInfo {\n 10: optional i16 kind // {Normal, Sticky}\n 12: optional i64 (js.type = \"Long\") ackLevel\n 14: optional i64 (js.type = \"Long\") expiryTimeNanos\n 16: optional i64 (js.type = \"Long\") lastUpdatedNanos\n}\n\nstruct TransferTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional binary targetDomainID\n 20: optional string targetWorkflowID\n 22: optional binary targetRunID\n 24: optional string taskList\n 26: optional bool targetChildWorkflowOnly\n 28: optional i64 (js.type = \"Long\") scheduleID\n 30: optional i64 (js.type = \"Long\") version\n 32: optional i64 (js.type = \"Long\") visibilityTimestampNanos\n}\n\nstruct TimerTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i16 timeoutType\n 20: optional i64 (js.type = \"Long\") version\n 22: optional i64 (js.type = \"Long\") scheduleAttempt\n 24: optional i64 (js.type = \"Long\") eventID\n}\n\nstruct ReplicationTaskInfo {\n 10: optional binary domainID\n 12: optional string workflowID\n 14: optional binary runID\n 16: optional i16 taskType\n 18: optional i64 (js.type = \"Long\") version\n 20: optional i64 (js.type = \"Long\") firstEventID\n 22: optional i64 (js.type = \"Long\") nextEventID\n 24: optional i64 (js.type = \"Long\") scheduledID\n 26: optional i32 eventStoreVersion\n 28: optional i32 newRunEventStoreVersion\n 30: optional binary branch_token\n 32: optional map lastReplicationInfo\n 34: optional binary newRunBranchToken\n 36: optional bool resetWorkflow\n 38: optional i64 (js.type = \"Long\") creationTime\n}" diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index f33541b8e7f..55279753681 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -214,7 +214,8 @@ const ( `branch_token: ?, ` + `reset_workflow: ?, ` + `new_run_event_store_version: ?, ` + - `new_run_branch_token: ? ` + + `new_run_branch_token: ?, ` + + `created_time: ? ` + `}` templateTimerTaskType = `{` + @@ -2801,6 +2802,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplication p.EventStoreVersion, task.NewRunBranchToken, defaultVisibilityTimestamp, + defaultVisibilityTimestamp, task.GetTaskID()) err := query.Exec() @@ -2907,6 +2909,7 @@ func (d *cassandraPersistence) CreateFailoverMarkerTasks( task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, + task.GetVisibilityTimestamp().UnixNano(), ); err != nil { return err } diff --git a/common/persistence/cassandra/cassandraPersistenceUtil.go b/common/persistence/cassandra/cassandraPersistenceUtil.go index 6014b2dcd5d..217e356d20a 100644 --- a/common/persistence/cassandra/cassandraPersistenceUtil.go +++ b/common/persistence/cassandra/cassandraPersistenceUtil.go @@ -145,6 +145,7 @@ func applyWorkflowMutationBatch( workflowMutation.TransferTasks, workflowMutation.ReplicationTasks, workflowMutation.TimerTasks, + cqlNowTimestampMillis, ) } @@ -253,6 +254,7 @@ func applyWorkflowSnapshotBatchAsReset( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, + cqlNowTimestampMillis, ) } @@ -357,6 +359,7 @@ func applyWorkflowSnapshotBatchAsNew( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, + cqlNowTimestampMillis, ) } @@ -919,6 +922,7 @@ func applyTasks( transferTasks []p.Task, replicationTasks []p.Task, timerTasks []p.Task, + creationTime int64, ) error { if err := createTransferTasks( @@ -939,6 +943,7 @@ func applyTasks( domainID, workflowID, runID, + creationTime, ); err != nil { return err } @@ -1054,6 +1059,7 @@ func createReplicationTasks( domainID string, workflowID string, runID string, + creationTime int64, ) error { for _, task := range replicationTasks { @@ -1088,8 +1094,7 @@ func createReplicationTasks( case p.ReplicationTaskTypeFailoverMarker: version = task.GetVersion() - // Failover marker uses firstEventID to store visibility timestamp - firstEventID = task.GetVisibilityTimestamp().UnixNano() + creationTime = task.GetVisibilityTimestamp().UnixNano() default: return &workflow.InternalServiceError{ @@ -1118,6 +1123,7 @@ func createReplicationTasks( resetWorkflow, p.EventStoreVersion, newRunBranchToken, + creationTime, defaultVisibilityTimestamp, task.GetTaskID()) } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 1ebf3f5cf52..fe7cde98094 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -384,6 +384,7 @@ type ( BranchToken []byte NewRunBranchToken []byte ResetWorkflow bool + CreationTime int64 // TODO deprecate when NDC is fully released && migrated LastReplicationInfo map[string]*ReplicationInfo @@ -609,7 +610,7 @@ type ( // FailoverMarkerTask is the marker for graceful failover FailoverMarkerTask struct { TaskID int64 - VisibilityTimestamp time.Time // Visibility timestamp stores in the field scheduleEventID + VisibilityTimestamp time.Time // Visibility timestamp stores in the field scheduleEventID Version int64 DomainID string } diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 5b5ea91ec5e..7f3037b7ea6 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -985,6 +985,7 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse( BranchToken: info.GetBranchToken(), NewRunBranchToken: info.GetNewRunBranchToken(), ResetWorkflow: info.GetResetWorkflow(), + CreationTime: info.GetCreationTime(), } } var nextPageToken []byte @@ -1115,6 +1116,7 @@ func (m *sqlExecutionManager) CreateFailoverMarkerTasks( sqlplugin.MustParseUUID(task.DomainID), common.EmptyReplicationUUID, sqlplugin.MustParseUUID(common.EmptyReplicationUUID), + task.GetVisibilityTimestamp().UnixNano(), ); err != nil { rollBackErr := tx.Rollback() if rollBackErr != nil { diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 20385a8501b..9a92065b1e8 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -92,7 +92,8 @@ func applyWorkflowMutationTx( runID, workflowMutation.TransferTasks, workflowMutation.ReplicationTasks, - workflowMutation.TimerTasks); err != nil { + workflowMutation.TimerTasks, + executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { return err } @@ -252,7 +253,8 @@ func applyWorkflowSnapshotTxAsReset( runID, workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, - workflowSnapshot.TimerTasks); err != nil { + workflowSnapshot.TimerTasks, + executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { return err } @@ -440,7 +442,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( runID, workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, - workflowSnapshot.TimerTasks); err != nil { + workflowSnapshot.TimerTasks, + executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { return err } @@ -528,6 +531,7 @@ func applyTasks( transferTasks []p.Task, replicationTasks []p.Task, timerTasks []p.Task, + creationTime int64, ) error { if err := createTransferTasks(tx, @@ -546,7 +550,8 @@ func applyTasks( shardID, domainID, workflowID, - runID); err != nil { + runID, + creationTime); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err), } @@ -838,6 +843,7 @@ func createReplicationTasks( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, + creationTime int64, ) error { if len(replicationTasks) == 0 { @@ -883,7 +889,7 @@ func createReplicationTasks( case p.ReplicationTaskTypeFailoverMarker: version = task.GetVersion() // Failover marker uses firstEventID to store visibility timestamp - firstEventID = task.GetVisibilityTimestamp().UnixNano() + creationTime = task.GetVisibilityTimestamp().UnixNano() default: return &workflow.InternalServiceError{ @@ -906,6 +912,7 @@ func createReplicationTasks( BranchToken: branchToken, NewRunBranchToken: newRunBranchToken, ResetWorkflow: &resetWorkflow, + CreationTime: common.Int64Ptr(creationTime), }) if err != nil { return err diff --git a/schema/cassandra/cadence/schema.cql b/schema/cassandra/cadence/schema.cql index 91a537e2b44..ba39ffa14c8 100644 --- a/schema/cassandra/cadence/schema.cql +++ b/schema/cassandra/cadence/schema.cql @@ -132,6 +132,7 @@ CREATE TYPE replication_task ( new_run_event_store_version int, -- indicates which version of event store to query for new run(continueAsNew) new_run_branch_token blob, -- if eventV2, then query with this token for new run(continueAsNew) reset_workflow boolean, -- whether the task is for resetWorkflowExecution + created_time bigint, -- task creation timestamp ); CREATE TYPE timer_task ( diff --git a/schema/cassandra/cadence/versioned/v0.28/manifest.json b/schema/cassandra/cadence/versioned/v0.28/manifest.json new file mode 100644 index 00000000000..1a99826a46c --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.28/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.28", + "MinCompatibleVersion": "0.27", + "Description": "Add creation timestamp in replication task type", + "SchemaUpdateCqlFiles": [ + "replication_task_creation_time.cql" + ] +} \ No newline at end of file diff --git a/schema/cassandra/cadence/versioned/v0.28/replication_task_creation_time.cql b/schema/cassandra/cadence/versioned/v0.28/replication_task_creation_time.cql new file mode 100644 index 00000000000..ea9f0e98efe --- /dev/null +++ b/schema/cassandra/cadence/versioned/v0.28/replication_task_creation_time.cql @@ -0,0 +1 @@ +ALTER TYPE replication_task ADD created_time bigint; \ No newline at end of file From bf2ff9470aca34d001de0af0f4cad92cbaa2b7f3 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 26 May 2020 13:16:56 -0700 Subject: [PATCH 3/5] Address comments --- common/constants.go | 2 -- .../persistence/cassandra/cassandraPersistence.go | 13 ++++++++++++- common/persistence/dataInterfaces.go | 2 +- common/persistence/sql/sqlExecutionManager.go | 12 ++++++++---- common/persistence/sql/sqlExecutionManagerUtil.go | 1 - schema/cassandra/version.go | 2 +- service/history/resource/resourceTest.go | 1 + 7 files changed, 23 insertions(+), 10 deletions(-) diff --git a/common/constants.go b/common/constants.go index e3a4229ce5a..bc240e52568 100644 --- a/common/constants.go +++ b/common/constants.go @@ -52,8 +52,6 @@ const ( const ( // EmptyUUID is the placeholder for UUID when it's empty EmptyUUID = "emptyUuid" - // EmptyReplicationUUID is the placeholder for replication task UUID when it's empty - EmptyReplicationUUID = "30000000-5000-f000-f000-000000000000" ) const ( diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index 55279753681..37df8e9b063 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -2915,5 +2915,16 @@ func (d *cassandraPersistence) CreateFailoverMarkerTasks( } } - return d.session.ExecuteBatch(batch) + err := d.session.ExecuteBatch(batch) + if err != nil { + if isThrottlingError(err) { + return &workflow.ServiceBusyError{ + Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err), + } + } + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err), + } + } + return nil } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index fe7cde98094..996cd2bccd1 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -610,7 +610,7 @@ type ( // FailoverMarkerTask is the marker for graceful failover FailoverMarkerTask struct { TaskID int64 - VisibilityTimestamp time.Time // Visibility timestamp stores in the field scheduleEventID + VisibilityTimestamp time.Time Version int64 DomainID string } diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 7f3037b7ea6..24e58f61013 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -28,17 +28,21 @@ import ( "math" "time" - "github.com/uber/cadence/common/log/tag" - workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/sql/sqlplugin" ) +const ( + emptyWorkflowID string = "" + emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000" +) + type sqlExecutionManager struct { sqlStore shardID int @@ -1114,8 +1118,8 @@ func (m *sqlExecutionManager) CreateFailoverMarkerTasks( t, m.shardID, sqlplugin.MustParseUUID(task.DomainID), - common.EmptyReplicationUUID, - sqlplugin.MustParseUUID(common.EmptyReplicationUUID), + emptyWorkflowID, + sqlplugin.MustParseUUID(emptyReplicationRunID), task.GetVisibilityTimestamp().UnixNano(), ); err != nil { rollBackErr := tx.Rollback() diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 9a92065b1e8..861d4947cef 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -888,7 +888,6 @@ func createReplicationTasks( case p.ReplicationTaskTypeFailoverMarker: version = task.GetVersion() - // Failover marker uses firstEventID to store visibility timestamp creationTime = task.GetVisibilityTimestamp().UnixNano() default: diff --git a/schema/cassandra/version.go b/schema/cassandra/version.go index 4a5ff3e4aef..4c2a169b929 100644 --- a/schema/cassandra/version.go +++ b/schema/cassandra/version.go @@ -23,7 +23,7 @@ package cassandra // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the Cassandra database release version -const Version = "0.27" +const Version = "0.28" // VisibilityVersion is the Cassandra visibility database release version const VisibilityVersion = "0.5" diff --git a/service/history/resource/resourceTest.go b/service/history/resource/resourceTest.go index 3dd70f341bb..2bdbb1da83d 100644 --- a/service/history/resource/resourceTest.go +++ b/service/history/resource/resourceTest.go @@ -22,6 +22,7 @@ package resource import ( "github.com/golang/mock/gomock" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/service/history/events" From 496f44927dc6139e8879a805d3a708afcd5bef01 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Tue, 26 May 2020 23:57:40 -0700 Subject: [PATCH 4/5] Persist replication visibility timestamp --- .../persistence/cassandra/cassandraPersistence.go | 1 - .../cassandra/cassandraPersistenceUtil.go | 9 +-------- common/persistence/sql/sqlExecutionManager.go | 1 - common/persistence/sql/sqlExecutionManagerUtil.go | 13 +++++-------- 4 files changed, 6 insertions(+), 18 deletions(-) diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index 37df8e9b063..03daefabd8d 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -2909,7 +2909,6 @@ func (d *cassandraPersistence) CreateFailoverMarkerTasks( task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, - task.GetVisibilityTimestamp().UnixNano(), ); err != nil { return err } diff --git a/common/persistence/cassandra/cassandraPersistenceUtil.go b/common/persistence/cassandra/cassandraPersistenceUtil.go index 217e356d20a..343c6e74ca9 100644 --- a/common/persistence/cassandra/cassandraPersistenceUtil.go +++ b/common/persistence/cassandra/cassandraPersistenceUtil.go @@ -145,7 +145,6 @@ func applyWorkflowMutationBatch( workflowMutation.TransferTasks, workflowMutation.ReplicationTasks, workflowMutation.TimerTasks, - cqlNowTimestampMillis, ) } @@ -254,7 +253,6 @@ func applyWorkflowSnapshotBatchAsReset( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, - cqlNowTimestampMillis, ) } @@ -359,7 +357,6 @@ func applyWorkflowSnapshotBatchAsNew( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, - cqlNowTimestampMillis, ) } @@ -922,7 +919,6 @@ func applyTasks( transferTasks []p.Task, replicationTasks []p.Task, timerTasks []p.Task, - creationTime int64, ) error { if err := createTransferTasks( @@ -943,7 +939,6 @@ func applyTasks( domainID, workflowID, runID, - creationTime, ); err != nil { return err } @@ -1059,7 +1054,6 @@ func createReplicationTasks( domainID string, workflowID string, runID string, - creationTime int64, ) error { for _, task := range replicationTasks { @@ -1094,7 +1088,6 @@ func createReplicationTasks( case p.ReplicationTaskTypeFailoverMarker: version = task.GetVersion() - creationTime = task.GetVisibilityTimestamp().UnixNano() default: return &workflow.InternalServiceError{ @@ -1123,7 +1116,7 @@ func createReplicationTasks( resetWorkflow, p.EventStoreVersion, newRunBranchToken, - creationTime, + task.GetVisibilityTimestamp().UnixNano(), defaultVisibilityTimestamp, task.GetTaskID()) } diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 24e58f61013..9e856fee5cb 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -1120,7 +1120,6 @@ func (m *sqlExecutionManager) CreateFailoverMarkerTasks( sqlplugin.MustParseUUID(task.DomainID), emptyWorkflowID, sqlplugin.MustParseUUID(emptyReplicationRunID), - task.GetVisibilityTimestamp().UnixNano(), ); err != nil { rollBackErr := tx.Rollback() if rollBackErr != nil { diff --git a/common/persistence/sql/sqlExecutionManagerUtil.go b/common/persistence/sql/sqlExecutionManagerUtil.go index 861d4947cef..779261f4488 100644 --- a/common/persistence/sql/sqlExecutionManagerUtil.go +++ b/common/persistence/sql/sqlExecutionManagerUtil.go @@ -93,7 +93,7 @@ func applyWorkflowMutationTx( workflowMutation.TransferTasks, workflowMutation.ReplicationTasks, workflowMutation.TimerTasks, - executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { + ); err != nil { return err } @@ -254,7 +254,7 @@ func applyWorkflowSnapshotTxAsReset( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, - executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { + ); err != nil { return err } @@ -443,7 +443,7 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew( workflowSnapshot.TransferTasks, workflowSnapshot.ReplicationTasks, workflowSnapshot.TimerTasks, - executionInfo.LastUpdatedTimestamp.UnixNano()); err != nil { + ); err != nil { return err } @@ -531,7 +531,6 @@ func applyTasks( transferTasks []p.Task, replicationTasks []p.Task, timerTasks []p.Task, - creationTime int64, ) error { if err := createTransferTasks(tx, @@ -551,7 +550,7 @@ func applyTasks( domainID, workflowID, runID, - creationTime); err != nil { + ); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err), } @@ -843,7 +842,6 @@ func createReplicationTasks( domainID sqlplugin.UUID, workflowID string, runID sqlplugin.UUID, - creationTime int64, ) error { if len(replicationTasks) == 0 { @@ -888,7 +886,6 @@ func createReplicationTasks( case p.ReplicationTaskTypeFailoverMarker: version = task.GetVersion() - creationTime = task.GetVisibilityTimestamp().UnixNano() default: return &workflow.InternalServiceError{ @@ -911,7 +908,7 @@ func createReplicationTasks( BranchToken: branchToken, NewRunBranchToken: newRunBranchToken, ResetWorkflow: &resetWorkflow, - CreationTime: common.Int64Ptr(creationTime), + CreationTime: common.Int64Ptr(task.GetVisibilityTimestamp().UnixNano()), }) if err != nil { return err From 4a03fbb7fd633b1731969f8d486f98770322cafb Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Wed, 27 May 2020 12:56:10 -0700 Subject: [PATCH 5/5] Pin cadence idl changes --- idls | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/idls b/idls index 805b2635b6d..365a085a8e9 160000 --- a/idls +++ b/idls @@ -1 +1 @@ -Subproject commit 805b2635b6deeda894cff4fd0cde13614a32df8c +Subproject commit 365a085a8e9401f12632b89f801dd99560ff7eba