Skip to content

Commit

Permalink
Failover marker persistence (cadence-workflow#3274)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and mkolodezny committed May 29, 2020
1 parent 5ba14af commit 825fe05
Show file tree
Hide file tree
Showing 19 changed files with 244 additions and 13 deletions.
52 changes: 48 additions & 4 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ const (
PersistenceDeleteReplicationTaskFromDLQScope
// PersistenceRangeDeleteReplicationTaskFromDLQScope tracks PersistenceRangeDeleteReplicationTaskFromDLQScope calls made by service to persistence layer
PersistenceRangeDeleteReplicationTaskFromDLQScope
// PersistenceCreateFailoverMakerTaskScope tracks CreateFailoverMakerTasks calls made by service to persistence layer
PersistenceCreateFailoverMakerTasksScope
// PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer
PersistenceGetTimerIndexTasksScope
// PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1067,6 +1069,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetReplicationTasksFromDLQScope: {operation: "GetReplicationTasksFromDLQ"},
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceCreateFailoverMakerTasksScope: {operation: "CreateFailoverMarkerTasks"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
Expand Down
14 changes: 14 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,20 @@ func (_m *ExecutionManager) RangeCompleteTimerTask(request *persistence.RangeCom
return r0
}

// CreateFailoverMarkerTasks provides a mock function with given fields: request
func (_m *ExecutionManager) CreateFailoverMarkerTasks(request *persistence.CreateFailoverMarkersRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(*persistence.CreateFailoverMarkersRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// Close provides a mock function with given fields:
func (_m *ExecutionManager) Close() {
_m.Called()
Expand Down
37 changes: 36 additions & 1 deletion common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ const (
`branch_token: ?, ` +
`reset_workflow: ?, ` +
`new_run_event_store_version: ?, ` +
`new_run_branch_token: ? ` +
`new_run_branch_token: ?, ` +
`created_time: ? ` +
`}`

templateTimerTaskType = `{` +
Expand Down Expand Up @@ -2801,6 +2802,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplication
p.EventStoreVersion,
task.NewRunBranchToken,
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
task.GetTaskID())

err := query.Exec()
Expand Down Expand Up @@ -2892,3 +2894,36 @@ func (d *cassandraPersistence) RangeDeleteReplicationTaskFromDLQ(
}
return nil
}

func (d *cassandraPersistence) CreateFailoverMarkerTasks(
request *p.CreateFailoverMarkersRequest,
) error {

batch := d.session.NewBatch(gocql.LoggedBatch)
for _, task := range request.Markers {
t := []p.Task{task}
if err := createReplicationTasks(
batch,
t,
d.shardID,
task.DomainID,
rowTypeReplicationWorkflowID,
rowTypeReplicationRunID,
); err != nil {
return err
}
}

err := d.session.ExecuteBatch(batch)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateFailoverMarkerTasks operation failed. Error: %v", err),
}
}
return nil
}
4 changes: 4 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,9 @@ func createReplicationTasks(
// cassandra does not like null
lastReplicationInfo = make(map[string]map[string]interface{})

case p.ReplicationTaskTypeFailoverMarker:
version = task.GetVersion()

default:
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unknow replication type: %v", task.GetType()),
Expand Down Expand Up @@ -1113,6 +1116,7 @@ func createReplicationTasks(
resetWorkflow,
p.EventStoreVersion,
newRunBranchToken,
task.GetVisibilityTimestamp().UnixNano(),
defaultVisibilityTimestamp,
task.GetTaskID())
}
Expand Down
8 changes: 7 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ type (
BranchToken []byte
NewRunBranchToken []byte
ResetWorkflow bool
CreationTime int64

// TODO deprecate when NDC is fully released && migrated
LastReplicationInfo map[string]*ReplicationInfo
Expand Down Expand Up @@ -611,7 +612,6 @@ type (
TaskID int64
VisibilityTimestamp time.Time
Version int64
SourceCluster string
DomainID string
}

Expand Down Expand Up @@ -1474,6 +1474,11 @@ type (
Branches []HistoryBranchDetail
}

// CreateFailoverMarkersRequest is request to create failover markers
CreateFailoverMarkersRequest struct {
Markers []*FailoverMarkerTask
}

// Closeable is an interface for any entity that supports a close operation to release resources
Closeable interface {
Close()
Expand Down Expand Up @@ -1516,6 +1521,7 @@ type (
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,12 @@ func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ(
return m.persistence.RangeDeleteReplicationTaskFromDLQ(request)
}

func (m *executionManagerImpl) CreateFailoverMarkerTasks(
request *CreateFailoverMarkersRequest,
) error {
return m.persistence.CreateFailoverMarkerTasks(request)
}

// Timer related methods.
func (m *executionManagerImpl) GetTimerIndexTasks(
request *GetTimerIndexTasksRequest,
Expand Down
23 changes: 23 additions & 0 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5280,6 +5280,29 @@ func (s *ExecutionManagerSuite) TestReplicationDLQ() {
s.Len(resp.Tasks, 0)
}

// TestCreateFailoverMarkerTasks test
func (s *ExecutionManagerSuite) TestCreateFailoverMarkerTasks() {
domainID := uuid.New()
markers := []*p.FailoverMarkerTask{
{
TaskID: 1,
VisibilityTimestamp: time.Now(),
DomainID: domainID,
Version: 1,
},
}
err := s.CreateFailoverMarkers(markers)
s.NoError(err)

tasks, err := s.GetReplicationTasks(1, true)
s.NoError(err)
s.Equal(len(tasks), 1)
s.Equal(tasks[0].Version, int64(1))
s.Equal(tasks[0].TaskID, int64(1))
s.Equal(tasks[0].DomainID, domainID)
s.Equal(tasks[0].TaskType, p.ReplicationTaskTypeFailoverMarker)
}

func copyWorkflowExecutionInfo(sourceInfo *p.WorkflowExecutionInfo) *p.WorkflowExecutionInfo {
return &p.WorkflowExecutionInfo{
DomainID: sourceInfo.DomainID,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,16 @@ func (s *TestBase) RangeDeleteReplicationTaskFromDLQ(
})
}

// CreateFailoverMarkers is a utility method to create failover markers
func (s *TestBase) CreateFailoverMarkers(
markers []*p.FailoverMarkerTask,
) error {

return s.ExecutionManager.CreateFailoverMarkerTasks(&p.CreateFailoverMarkersRequest{
Markers: markers,
})
}

// CompleteTransferTask is a utility method to complete a transfer task
func (s *TestBase) CompleteTransferTask(taskID int64) error {

Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type (
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
DeleteReplicationTaskFromDLQ(request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(request *RangeDeleteReplicationTaskFromDLQRequest) error
CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down
14 changes: 14 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ func (p *workflowExecutionPersistenceClient) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func (p *workflowExecutionPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error {
p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceLatency)
err := p.persistence.CreateFailoverMarkerTasks(request)
sw.Stop()

if err != nil {
p.metricClient.IncCounter(metrics.PersistenceCreateFailoverMakerTasksScope, metrics.PersistenceFailures)
}

return err
}

func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests)

Expand Down
9 changes: 9 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,15 @@ func (p *workflowExecutionRateLimitedPersistenceClient) RangeDeleteReplicationTa
return p.persistence.RangeDeleteReplicationTaskFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) CreateFailoverMarkerTasks(request *CreateFailoverMarkersRequest) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}

err := p.persistence.CreateFailoverMarkerTasks(request)
return err
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
44 changes: 44 additions & 0 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)

const (
emptyWorkflowID string = ""
emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000"
)

type sqlExecutionManager struct {
sqlStore
shardID int
Expand Down Expand Up @@ -983,6 +989,7 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse(
BranchToken: info.GetBranchToken(),
NewRunBranchToken: info.GetNewRunBranchToken(),
ResetWorkflow: info.GetResetWorkflow(),
CreationTime: info.GetCreationTime(),
}
}
var nextPageToken []byte
Expand Down Expand Up @@ -1095,6 +1102,43 @@ func (m *sqlExecutionManager) RangeDeleteReplicationTaskFromDLQ(
return nil
}

func (m *sqlExecutionManager) CreateFailoverMarkerTasks(
request *p.CreateFailoverMarkersRequest,
) error {

tx, err := m.db.BeginTx()
if err != nil {
return err
}

for _, task := range request.Markers {
t := []p.Task{task}
if err := createReplicationTasks(
tx,
t,
m.shardID,
sqlplugin.MustParseUUID(task.DomainID),
emptyWorkflowID,
sqlplugin.MustParseUUID(emptyReplicationRunID),
); err != nil {
rollBackErr := tx.Rollback()
if rollBackErr != nil {
m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
}

return &workflow.InternalServiceError{
Message: fmt.Sprintf("%v: %v", "CreateFailoverMarkerTasks", err),
}
}
}
if err := tx.Commit(); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("%s operation failed. Failed to commit transaction. Error: %v", "CreateFailoverMarkerTasks", err),
}
}
return nil
}

type timerTaskPageToken struct {
TaskID int64
Timestamp time.Time
Expand Down
18 changes: 13 additions & 5 deletions common/persistence/sql/sqlExecutionManagerUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func applyWorkflowMutationTx(
runID,
workflowMutation.TransferTasks,
workflowMutation.ReplicationTasks,
workflowMutation.TimerTasks); err != nil {
workflowMutation.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -252,7 +253,8 @@ func applyWorkflowSnapshotTxAsReset(
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks); err != nil {
workflowSnapshot.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -440,7 +442,8 @@ func (m *sqlExecutionManager) applyWorkflowSnapshotTxAsNew(
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks); err != nil {
workflowSnapshot.TimerTasks,
); err != nil {
return err
}

Expand Down Expand Up @@ -546,7 +549,8 @@ func applyTasks(
shardID,
domainID,
workflowID,
runID); err != nil {
runID,
); err != nil {
return &workflow.InternalServiceError{
Message: fmt.Sprintf("applyTasks failed. Failed to create replication tasks. Error: %v", err),
}
Expand Down Expand Up @@ -851,8 +855,8 @@ func createReplicationTasks(
nextEventID := common.EmptyEventID
version := common.EmptyVersion
activityScheduleID := common.EmptyEventID
var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo

var lastReplicationInfo map[string]*sqlblobs.ReplicationInfo
var branchToken, newRunBranchToken []byte
var resetWorkflow bool

Expand Down Expand Up @@ -880,6 +884,9 @@ func createReplicationTasks(
activityScheduleID = task.(*p.SyncActivityTask).ScheduledID
lastReplicationInfo = map[string]*sqlblobs.ReplicationInfo{}

case p.ReplicationTaskTypeFailoverMarker:
version = task.GetVersion()

default:
return &workflow.InternalServiceError{
Message: fmt.Sprintf("Unknown replication task: %v", task.GetType()),
Expand All @@ -901,6 +908,7 @@ func createReplicationTasks(
BranchToken: branchToken,
NewRunBranchToken: newRunBranchToken,
ResetWorkflow: &resetWorkflow,
CreationTime: common.Int64Ptr(task.GetVisibilityTimestamp().UnixNano()),
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 805b26 to 365a08
1 change: 1 addition & 0 deletions schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ CREATE TYPE replication_task (
new_run_event_store_version int, -- indicates which version of event store to query for new run(continueAsNew)
new_run_branch_token blob, -- if eventV2, then query with this token for new run(continueAsNew)
reset_workflow boolean, -- whether the task is for resetWorkflowExecution
created_time bigint, -- task creation timestamp
);

CREATE TYPE timer_task (
Expand Down
Loading

0 comments on commit 825fe05

Please sign in to comment.