Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failover marker persistence #3274

Merged
merged 12 commits into from
May 27, 2020
Merged
52 changes: 48 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ const (
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceRangeDeleteReplicationTaskFromDLQScope
// PersistenceCreateFailoverMakerTaskScope tracks CreateFailoverMakerTasks calls made by service to persistence layer
PersistenceCreateFailoverMakerTasksScope
// PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer
PersistenceGetTimerIndexTasksScope
// PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1067,6 +1069,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceCreateFailoverMakerTasksScope: {operation: "CreateFailoverMarkerTasks"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
Expand Down
14 changes: 14 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,20 @@ func (_m *ExecutionManager) RangeCompleteTimerTask(request *persistence.RangeCom
return r0
}

// CreateFailoverMarkerTasks provides a mock function with given fields: request
func (_m *ExecutionManager) CreateFailoverMarkerTasks(request *persistence.CreateFailoverMarkersRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(*persistence.CreateFailoverMarkersRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// Close provides a mock function with given fields:
func (_m *ExecutionManager) Close() {
_m.Called()
Expand Down
37 changes: 36 additions & 1 deletion common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ const (
`branch_token: ?, ` +
`reset_workflow: ?, ` +
`new_run_event_store_version: ?, ` +
`new_run_branch_token: ? ` +
`new_run_branch_token: ?, ` +
`created_time: ? ` +
`}`

templateTimerTaskType = `{` +
Expand Down Expand Up @@ -2801,6 +2802,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplication
p.EventStoreVersion,
task.NewRunBranchToken,
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
task.GetTaskID())

err := query.Exec()
Expand Down Expand Up @@ -2892,3 +2894,36 @@ func (d *cassandraPersistence) RangeDeleteReplicationTaskFromDLQ(
}
return nil
}

func (d *cassandraPersistence) CreateFailoverMarkerTasks(
request *p.CreateFailoverMarkersRequest,
) error {

batch := d.session.NewBatch(gocql.LoggedBatch)
for _, task := range request.Markers {
t := []p.Task{task}
yux0 marked this conversation as resolved.
Show resolved Hide resolved
if err := createReplicationTasks(
batch,
t,
d.shardID,
task.DomainID,
rowTypeReplicationWorkflowID,
rowTypeReplicationRunID,
); err != nil {
return err
}
}

err := d.session.ExecuteBatch(batch)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err),
}
}
return nil
}
4 changes: 4 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,9 @@ func createReplicationTasks(
// cassandra does not like null
lastReplicationInfo = make(map[string]map[string]interface{})

case p.ReplicationTaskTypeFailoverMarker:
version = task.GetVersion()

default:
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unknow replication type: %v", task.GetType()),
Expand Down Expand Up @@ -1113,6 +1116,7 @@ func createReplicationTasks(
resetWorkflow,
p.EventStoreVersion,
newRunBranchToken,
task.GetVisibilityTimestamp().UnixNano(),
defaultVisibilityTimestamp,
task.GetTaskID())
}
Expand Down
8 changes: 7 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ type (
BranchToken []byte
NewRunBranchToken []byte
ResetWorkflow bool
CreationTime int64

// TODO deprecate when NDC is fully released && migrated
LastReplicationInfo map[string]*ReplicationInfo
Expand Down Expand Up @@ -611,7 +612,6 @@ type (
TaskID int64
VisibilityTimestamp time.Time
Version int64
SourceCluster string
DomainID string
}

Expand Down Expand Up @@ -1474,6 +1474,11 @@ type (
Branches []HistoryBranchDetail
}

// CreateFailoverMarkersRequest is request to create failover markers
CreateFailoverMarkersRequest struct {
Markers []*FailoverMarkerTask
}

// Closeable is an interface for any entity that supports a close operation to release resources
Closeable interface {
Close()
Expand Down Expand Up @@ -1516,6 +1521,7 @@ type (
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,12 @@ func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
return m.persistence.RangeDeleteReplicationTaskFromDLQ(request)
}

func (m *executionManagerImpl) CreateFailoverMarkerTasks(
request *CreateFailoverMarkersRequest,
) error {
return m.persistence.CreateFailoverMarkerTasks(request)
}

// Timer related methods.
func (m *executionManagerImpl) GetTimerIndexTasks(
request *GetTimerIndexTasksRequest,
Expand Down
23 changes: 23 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5280,6 +5280,29 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() {
s.Len(resp.Tasks, 0)
}

// TestCreateFailoverMarkerTasks test
func (s *ExecutionManagerSuite) TestCreateFailoverMarkerTasks() {
domainID := uuid.New()
markers := []*p.FailoverMarkerTask{
{
TaskID: 1,
VisibilityTimestamp: time.Now(),
DomainID: domainID,
Version: 1,
},
}
err := s.CreateFailoverMarkers(markers)
s.NoError(err)

tasks, err := s.GetReplicationTasks(1, true)
s.NoError(err)
s.Equal(len(tasks), 1)
s.Equal(tasks[0].Version, int64(1))
s.Equal(tasks[0].TaskID, int64(1))
s.Equal(tasks[0].DomainID, domainID)
s.Equal(tasks[0].TaskType, p.ReplicationTaskTypeFailoverMarker)
}

func copyWorkflowExecutionInfo(sourceInfo *p.WorkflowExecutionInfo) *p.WorkflowExecutionInfo {
return &p.WorkflowExecutionInfo{
DomainID: sourceInfo.DomainID,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,16 @@ func (s *TestBase) RangeDeleteReplicationTaskFromDLQ(
})
}

// CreateFailoverMarkers is a utility method to create failover markers
func (s *TestBase) CreateFailoverMarkers(
markers []*p.FailoverMarkerTask,
) error {

return s.ExecutionManager.CreateFailoverMarkerTasks(&p.CreateFailoverMarkersRequest{
Markers: markers,
})
}

// CompleteTransferTask is a utility method to complete a transfer task
func (s *TestBase) CompleteTransferTask(taskID int64) error {

Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type (
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ func (p *workflowExecutionPersistenceClient) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func (p *workflowExecutionPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error {
p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceLatency)
err := p.persistence.CreateFailoverMarkerTasks(request)
sw.Stop()

if err != nil {
p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceFailures)
}

return err
}

func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests)

Expand Down
9 changes: 9 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) RangeDeleteReplicationTa
return p.persistence.RangeDeleteReplicationTaskFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}

err := p.persistence.CreateFailoverMarkerTasks(request)
return err
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
44 changes: 44 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

const (
emptyWorkflowID string = ""
emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
)

type sqlExecutionManager struct {
sqlStore
shardID int
Expand Down Expand Up @@ -983,6 +989,7 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse(
BranchToken: info.GetBranchToken(),
NewRunBranchToken: info.GetNewRunBranchToken(),
ResetWorkflow: info.GetResetWorkflow(),
CreationTime: info.GetCreationTime(),
}
}
var nextPageToken []byte
Expand Down Expand Up @@ -1095,6 +1102,43 @@ func (m *sqlExecutionManager) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func (m *sqlExecutionManager) CreateFailoverMarkerTasks(
request *p.CreateFailoverMarkersRequest,
) error {

tx, err := m.db.BeginTx()
if err != nil {
return err
}

for _, task := range request.Markers {
t := []p.Task{task}
if err := createReplicationTasks(
tx,
t,
m.shardID,
sqlplugin.MustParseUUID(task.DomainID),
emptyWorkflowID,
sqlplugin.MustParseUUID(emptyReplicationRunID),
); err != nil {
rollBackErr := tx.Rollback()
if rollBackErr != nil {
m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
}

return &workflow.InternalServiceError{
Message: fmt.Sprintf("%v: %v", "CreateFailoverMarkerTasks", err),
}
}
}
if err := tx.Commit(); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("%s operation failed. Failed to commit transaction. Error: %v", "CreateFailoverMarkerTasks", err),
}
}
return nil
}

type timerTaskPageToken struct {
TaskID int64
Timestamp time.Time
Expand Down
18 changes: 13 additions & 5 deletions common/persistence/sql/sqlExecutionManagerUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func applyWorkflowMutationTx(
runID,
workflowMutation.TransferTasks,
workflowMutation.ReplicationTasks,
workflowMutation.TimerTasks); err != nil {
workflowMutation.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -252,7 +253,8 @@ func applyWorkflowSnapshotTxAsReset(
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks); err != nil {
workflowSnapshot.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -440,7 +442,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew(
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks); err != nil {
workflowSnapshot.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -546,7 +549,8 @@ func applyTasks(
shardID,
domainID,
workflowID,
runID); err != nil {
runID,
); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err),
}
Expand Down Expand Up @@ -851,8 +855,8 @@ func createReplicationTasks(
nextEventID := common.EmptyEventID
version := common.EmptyVersion
activityScheduleID := common.EmptyEventID
var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo

var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo
var branchToken, newRunBranchToken []byte
var resetWorkflow bool

Expand Down Expand Up @@ -880,6 +884,9 @@ func createReplicationTasks(
activityScheduleID = task.(*p.SyncActivityTask).ScheduledID
lastReplicationInfo = map[string]*sqlblobs.ReplicationInfo{}

case p.ReplicationTaskTypeFailoverMarker:
version = task.GetVersion()

default:
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unknown replication task: %v", task.GetType()),
Expand All @@ -901,6 +908,7 @@ func createReplicationTasks(
BranchToken: branchToken,
NewRunBranchToken: newRunBranchToken,
ResetWorkflow: &resetWorkflow,
CreationTime: common.Int64Ptr(task.GetVisibilityTimestamp().UnixNano()),
})
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ CREATE TYPE replication_task (
new_run_event_store_version int, -- indicates which version of event store to query for new run(continueAsNew)
new_run_branch_token blob, -- if eventV2, then query with this token for new run(continueAsNew)
reset_workflow boolean, -- whether the task is for resetWorkflowExecution
created_time bigint, -- task creation timestamp
);

CREATE TYPE timer_task (
Expand Down
Loading