From 871aa7bd0dbf05e2ab7a6074a88e217f230decc5 Mon Sep 17 00:00:00 2001 From: Yu Xia Date: Thu, 3 Dec 2020 21:41:45 -1000 Subject: [PATCH] Add admin CLI scan command for unsupported workflows (#3824) --- Makefile | 6 + RELEASES.md | 50 +++ cmd/tools/cli/main.go | 2 + common/persistence/sql/sqlExecutionManager.go | 304 ++++++++++++------ .../persistence/sql/sqlplugin/interfaces.go | 6 +- .../sql/sqlplugin/mysql/execution.go | 26 +- .../sql/sqlplugin/postgres/execution.go | 26 +- tools/cli/admin.go | 57 +++- tools/cli/adminCommands.go | 41 +++ tools/cli/adminDBScanCommand.go | 154 ++++++++- tools/cli/adminKafkaCommands.go | 1 + tools/cli/app.go | 2 +- tools/cli/flags.go | 4 + 13 files changed, 565 insertions(+), 114 deletions(-) create mode 100644 RELEASES.md diff --git a/Makefile b/Makefile index 18b295f3f4f..6d49ef57aef 100644 --- a/Makefile +++ b/Makefile @@ -330,3 +330,9 @@ gen-internal-types: go run common/types/generator/main.go internal-types: gen-internal-types fmt copyright + +start-mysql: bins + ./cadence-server --zone mysql start + +start-mysql: bins + ./cadence-server --zone postgres start \ No newline at end of file diff --git a/RELEASES.md b/RELEASES.md new file mode 100644 index 00000000000..ff37be31863 --- /dev/null +++ b/RELEASES.md @@ -0,0 +1,50 @@ +# Releases upgrade instruction + +## Upgrade to 0.16 and above + +**TL;DR:** If your Cadence service is running on or above 0.14.0 and you do not have workflows running for more than 6 months. It is safe to upgrade without any operations. + +If your cluster has open workflows for more than 6 months, please download and run the following CLI command prior to the upgrades. + +Prior to release 0.16, the workflow state structure contains a field called replication state. Instead, we introduce a new field 'version histories' +to manage the replication. In release 0.16, this field 'replication state' has been removed from the code path. +From release 0.14, the field 'replication state' has been deprecated. If you are upgrading the service from version below 0.14.0 to 0.16.0 or +you may have workflows running for more than 6 months, please consider follow the instruction to see if you have any unsupported workflows. + +## Detect unsupported open workflows + +Run the following command based on your database. + +Cassandra: + +`cadence admin db unsupported-workflow --db_type=cassandra --db_address --db_port --username= --password= --keyspace --lower_shard_bound= --upper_shard_bound= --rps --output_filename ./cadence_scan` + +MySQL/Postgres: + +`cadence admin db unsupported-workflow --db_type= --db_address --db_port --username= --password= --db_name --lower_shard_bound= --upper_shard_bound= --rps --output_filename ./cadence_scan` + +**Note:** This CLI is a long-running process to scan the database for unsupported workflows. +If you have TLS configurations or use customized encoding/decoding type. Please use + +`cadence adm db unsupported-workflow --help` + +to configurate the correct connection. + + +After the CLI completes, a list of unsupported workflows will be listed in the file `cadence_scan`. + +For example: + +`cadence --address : --domain workflow reset --wid helloworld_7f2fa1fe-594e-44ad-a9e2-7ac7c5f97861 --rid e79ce972-b657-48a4-bba9-e2f2ac6424e2 --reset_type LastDecisionCompleted --reason 'release 0.16 upgrade'` + +## Reset the workflow + +Use reset CLI to reset these unsupported workflow. This operation is to migrate these unsupported workflow data. + +1. replace the host and port with your cadence host address and port number. +2. replace the domain uuid with the domain name. +To find the domain name, you can use `cadence --address : domain describe --domain_id=`. + +After replacing the fields with the correct values, you can copy/paste to run the CLI. This is going to reset those workflows to the last decision task completed event. + +**Note:** If your workflows has outstanding child workflows, you have to wait for these child workflows completion or terminate the workflows. diff --git a/cmd/tools/cli/main.go b/cmd/tools/cli/main.go index a5924f71b85..f7d27340c0f 100644 --- a/cmd/tools/cli/main.go +++ b/cmd/tools/cli/main.go @@ -24,6 +24,8 @@ import ( "fmt" "os" + _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql" // needed to load mysql plugin + _ "github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres" // needed to load postgres plugin "github.com/uber/cadence/tools/cli" ) diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 9606dd74a29..14ef68bb100 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -225,7 +225,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution( domainID := serialization.MustParseUUID(request.DomainID) runID := serialization.MustParseUUID(*request.Execution.RunID) wfID := *request.Execution.WorkflowID - execution, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{ + executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{ ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID}) if err != nil { @@ -243,103 +243,27 @@ func (m *sqlExecutionManager) GetWorkflowExecution( } } - info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding) - if err != nil { - return nil, err - } - - var state p.InternalWorkflowMutableState - state.ExecutionInfo = &p.InternalWorkflowExecutionInfo{ - DomainID: execution.DomainID.String(), - WorkflowID: execution.WorkflowID, - RunID: execution.RunID.String(), - NextEventID: execution.NextEventID, - TaskList: info.GetTaskList(), - WorkflowTypeName: info.GetWorkflowTypeName(), - WorkflowTimeout: common.SecondsToDuration(int64(info.GetWorkflowTimeoutSeconds())), - DecisionStartToCloseTimeout: common.SecondsToDuration(int64(info.GetDecisionTaskTimeoutSeconds())), - State: int(info.GetState()), - CloseStatus: int(info.GetCloseStatus()), - LastFirstEventID: info.GetLastFirstEventID(), - LastProcessedEvent: info.GetLastProcessedEvent(), - StartTimestamp: time.Unix(0, info.GetStartTimeNanos()), - LastUpdatedTimestamp: time.Unix(0, info.GetLastUpdatedTimeNanos()), - CreateRequestID: info.GetCreateRequestID(), - DecisionVersion: info.GetDecisionVersion(), - DecisionScheduleID: info.GetDecisionScheduleID(), - DecisionStartedID: info.GetDecisionStartedID(), - DecisionRequestID: info.GetDecisionRequestID(), - DecisionTimeout: common.SecondsToDuration(int64(info.GetDecisionTimeout())), - DecisionAttempt: info.GetDecisionAttempt(), - DecisionStartedTimestamp: time.Unix(0, info.GetDecisionStartedTimestampNanos()), - DecisionScheduledTimestamp: time.Unix(0, info.GetDecisionScheduledTimestampNanos()), - DecisionOriginalScheduledTimestamp: time.Unix(0, info.GetDecisionOriginalScheduledTimestampNanos()), - StickyTaskList: info.GetStickyTaskList(), - StickyScheduleToStartTimeout: common.SecondsToDuration(info.GetStickyScheduleToStartTimeout()), - ClientLibraryVersion: info.GetClientLibraryVersion(), - ClientFeatureVersion: info.GetClientFeatureVersion(), - ClientImpl: info.GetClientImpl(), - SignalCount: int32(info.GetSignalCount()), - HistorySize: info.GetHistorySize(), - CronSchedule: info.GetCronSchedule(), - CompletionEventBatchID: common.EmptyEventID, - HasRetryPolicy: info.GetHasRetryPolicy(), - Attempt: int32(info.GetRetryAttempt()), - InitialInterval: common.SecondsToDuration(int64(info.GetRetryInitialIntervalSeconds())), - BackoffCoefficient: info.GetRetryBackoffCoefficient(), - MaximumInterval: common.SecondsToDuration(int64(info.GetRetryMaximumIntervalSeconds())), - MaximumAttempts: info.GetRetryMaximumAttempts(), - ExpirationSeconds: common.SecondsToDuration(int64(info.GetRetryExpirationSeconds())), - ExpirationTime: time.Unix(0, info.GetRetryExpirationTimeNanos()), - BranchToken: info.GetEventBranchToken(), - ExecutionContext: info.GetExecutionContext(), - NonRetriableErrors: info.GetRetryNonRetryableErrors(), - SearchAttributes: info.GetSearchAttributes(), - Memo: info.GetMemo(), - } - - // TODO: remove this after all 2DC workflows complete - if info.LastWriteEventID != nil { - state.ReplicationState = &p.ReplicationState{} - state.ReplicationState.StartVersion = info.GetStartVersion() - state.ReplicationState.LastWriteVersion = execution.LastWriteVersion - state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID() - } - - if info.GetVersionHistories() != nil { - state.VersionHistories = p.NewDataBlob( - info.GetVersionHistories(), - common.EncodingType(info.GetVersionHistoriesEncoding()), - ) - } - - if info.ParentDomainID != nil { - state.ExecutionInfo.ParentDomainID = serialization.UUID(info.ParentDomainID).String() - state.ExecutionInfo.ParentWorkflowID = info.GetParentWorkflowID() - state.ExecutionInfo.ParentRunID = serialization.UUID(info.ParentRunID).String() - state.ExecutionInfo.InitiatedID = info.GetInitiatedID() - if state.ExecutionInfo.CompletionEvent != nil { - state.ExecutionInfo.CompletionEvent = nil + if len(executions) == 0 { + return nil, &types.EntityNotExistsError{ + Message: fmt.Sprintf( + "Workflow execution not found. WorkflowId: %v, RunId: %v", + request.Execution.GetWorkflowID(), + request.Execution.GetRunID(), + ), } } - if info.GetCancelRequested() { - state.ExecutionInfo.CancelRequested = true - state.ExecutionInfo.CancelRequestID = info.GetCancelRequestID() - } - - if info.CompletionEventBatchID != nil { - state.ExecutionInfo.CompletionEventBatchID = info.GetCompletionEventBatchID() - } - - if info.CompletionEvent != nil { - state.ExecutionInfo.CompletionEvent = p.NewDataBlob(info.CompletionEvent, - common.EncodingType(info.GetCompletionEventEncoding())) + if len(executions) != 1 { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("GetWorkflowExecution return more than one results."), + } } - if info.AutoResetPoints != nil { - state.ExecutionInfo.AutoResetPoints = p.NewDataBlob(info.AutoResetPoints, - common.EncodingType(info.GetAutoResetPointsEncoding())) + state, err := m.populateWorkflowMutableState(executions[0]) + if err != nil { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err), + } } { @@ -459,7 +383,7 @@ func (m *sqlExecutionManager) GetWorkflowExecution( } } - return &p.InternalGetWorkflowExecutionResponse{State: &state}, nil + return &p.InternalGetWorkflowExecutionResponse{State: state}, nil } func (m *sqlExecutionManager) UpdateWorkflowExecution( @@ -874,10 +798,63 @@ func (m *sqlExecutionManager) IsWorkflowExecutionExists( } func (m *sqlExecutionManager) ListConcreteExecutions( - _ context.Context, - _ *p.ListConcreteExecutionsRequest, + ctx context.Context, + request *p.ListConcreteExecutionsRequest, ) (*p.InternalListConcreteExecutionsResponse, error) { - return nil, &types.InternalServiceError{Message: "Not yet implemented"} + + filter := &sqlplugin.ExecutionsFilter{} + if len(request.PageToken) > 0 { + err := gobDeserialize(request.PageToken, &filter) + if err != nil { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), + } + } + } else { + filter = &sqlplugin.ExecutionsFilter{ + ShardID: m.shardID, + WorkflowID: "", + RunID: serialization.MustParseUUID(minUUID), + } + } + filter.Size = request.PageSize + + executions, err := m.db.SelectFromExecutions(ctx, filter) + if err != nil { + if err == sql.ErrNoRows { + return &p.InternalListConcreteExecutionsResponse{}, nil + } + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), + } + } + + if len(executions) == 0 { + return &p.InternalListConcreteExecutionsResponse{}, nil + } + lastExecution := executions[len(executions)-1] + nextFilter := &sqlplugin.ExecutionsFilter{ + ShardID: m.shardID, + WorkflowID: lastExecution.WorkflowID, + RunID: lastExecution.RunID, + } + token, err := gobSerialize(nextFilter) + if err != nil { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), + } + } + concreteExecutions, err := m.populateInternalListConcreteExecutions(executions) + if err != nil { + return nil, &types.InternalServiceError{ + Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), + } + } + + return &p.InternalListConcreteExecutionsResponse{ + Executions: concreteExecutions, + NextPageToken: token, + }, nil } func (m *sqlExecutionManager) GetTransferTasks( @@ -1354,3 +1331,136 @@ func (m *sqlExecutionManager) PutReplicationTaskToDLQ( return nil } + +func (m *sqlExecutionManager) populateWorkflowMutableState( + execution sqlplugin.ExecutionsRow, +) (*p.InternalWorkflowMutableState, error) { + + info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding) + if err != nil { + return nil, err + } + + state := &p.InternalWorkflowMutableState{} + state.ExecutionInfo = &p.InternalWorkflowExecutionInfo{ + DomainID: execution.DomainID.String(), + WorkflowID: execution.WorkflowID, + RunID: execution.RunID.String(), + NextEventID: execution.NextEventID, + TaskList: info.GetTaskList(), + WorkflowTypeName: info.GetWorkflowTypeName(), + WorkflowTimeout: common.SecondsToDuration(int64(info.GetWorkflowTimeoutSeconds())), + DecisionStartToCloseTimeout: common.SecondsToDuration(int64(info.GetDecisionTaskTimeoutSeconds())), + State: int(info.GetState()), + CloseStatus: int(info.GetCloseStatus()), + LastFirstEventID: info.GetLastFirstEventID(), + LastProcessedEvent: info.GetLastProcessedEvent(), + StartTimestamp: time.Unix(0, info.GetStartTimeNanos()), + LastUpdatedTimestamp: time.Unix(0, info.GetLastUpdatedTimeNanos()), + CreateRequestID: info.GetCreateRequestID(), + DecisionVersion: info.GetDecisionVersion(), + DecisionScheduleID: info.GetDecisionScheduleID(), + DecisionStartedID: info.GetDecisionStartedID(), + DecisionRequestID: info.GetDecisionRequestID(), + DecisionTimeout: common.SecondsToDuration(int64(info.GetDecisionTimeout())), + DecisionAttempt: info.GetDecisionAttempt(), + DecisionStartedTimestamp: time.Unix(0, info.GetDecisionStartedTimestampNanos()), + DecisionScheduledTimestamp: time.Unix(0, info.GetDecisionScheduledTimestampNanos()), + DecisionOriginalScheduledTimestamp: time.Unix(0, info.GetDecisionOriginalScheduledTimestampNanos()), + StickyTaskList: info.GetStickyTaskList(), + StickyScheduleToStartTimeout: common.SecondsToDuration(info.GetStickyScheduleToStartTimeout()), + ClientLibraryVersion: info.GetClientLibraryVersion(), + ClientFeatureVersion: info.GetClientFeatureVersion(), + ClientImpl: info.GetClientImpl(), + SignalCount: int32(info.GetSignalCount()), + HistorySize: info.GetHistorySize(), + CronSchedule: info.GetCronSchedule(), + CompletionEventBatchID: common.EmptyEventID, + HasRetryPolicy: info.GetHasRetryPolicy(), + Attempt: int32(info.GetRetryAttempt()), + InitialInterval: common.SecondsToDuration(int64(info.GetRetryInitialIntervalSeconds())), + BackoffCoefficient: info.GetRetryBackoffCoefficient(), + MaximumInterval: common.SecondsToDuration(int64(info.GetRetryMaximumIntervalSeconds())), + MaximumAttempts: info.GetRetryMaximumAttempts(), + ExpirationSeconds: common.SecondsToDuration(int64(info.GetRetryExpirationSeconds())), + ExpirationTime: time.Unix(0, info.GetRetryExpirationTimeNanos()), + BranchToken: info.GetEventBranchToken(), + ExecutionContext: info.GetExecutionContext(), + NonRetriableErrors: info.GetRetryNonRetryableErrors(), + SearchAttributes: info.GetSearchAttributes(), + Memo: info.GetMemo(), + } + + // TODO: remove this after all 2DC workflows complete + if info.LastWriteEventID != nil { + state.ReplicationState = &p.ReplicationState{} + state.ReplicationState.StartVersion = info.GetStartVersion() + state.ReplicationState.LastWriteVersion = execution.LastWriteVersion + state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID() + } + + if info.GetVersionHistories() != nil { + state.VersionHistories = p.NewDataBlob( + info.GetVersionHistories(), + common.EncodingType(info.GetVersionHistoriesEncoding()), + ) + } + + if info.ParentDomainID != nil { + state.ExecutionInfo.ParentDomainID = serialization.UUID(info.ParentDomainID).String() + state.ExecutionInfo.ParentWorkflowID = info.GetParentWorkflowID() + state.ExecutionInfo.ParentRunID = serialization.UUID(info.ParentRunID).String() + state.ExecutionInfo.InitiatedID = info.GetInitiatedID() + if state.ExecutionInfo.CompletionEvent != nil { + state.ExecutionInfo.CompletionEvent = nil + } + } + + if info.GetCancelRequested() { + state.ExecutionInfo.CancelRequested = true + state.ExecutionInfo.CancelRequestID = info.GetCancelRequestID() + } + + if info.CompletionEventBatchID != nil { + state.ExecutionInfo.CompletionEventBatchID = info.GetCompletionEventBatchID() + } + + if info.CompletionEvent != nil { + state.ExecutionInfo.CompletionEvent = p.NewDataBlob(info.CompletionEvent, + common.EncodingType(info.GetCompletionEventEncoding())) + } + + if info.AutoResetPoints != nil { + state.ExecutionInfo.AutoResetPoints = p.NewDataBlob(info.AutoResetPoints, + common.EncodingType(info.GetAutoResetPointsEncoding())) + } + return state, nil +} + +func (m *sqlExecutionManager) populateInternalListConcreteExecutions( + executions []sqlplugin.ExecutionsRow, +) ([]*p.InternalListConcreteExecutionsEntity, error) { + + concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions)) + for _, execution := range executions { + mutableState, err := m.populateWorkflowMutableState(execution) + if err != nil { + return nil, err + } + + var versionHistories *p.DataBlob + if len(execution.VersionHistories) != 0 { + versionHistories = p.NewDataBlob( + versionHistories.Data, + versionHistories.Encoding, + ) + } + + concreteExecution := &p.InternalListConcreteExecutionsEntity{ + ExecutionInfo: mutableState.ExecutionInfo, + VersionHistories: versionHistories, + } + concreteExecutions = append(concreteExecutions, concreteExecution) + } + return concreteExecutions, nil +} diff --git a/common/persistence/sql/sqlplugin/interfaces.go b/common/persistence/sql/sqlplugin/interfaces.go index c8081262f20..2f92917bb37 100644 --- a/common/persistence/sql/sqlplugin/interfaces.go +++ b/common/persistence/sql/sqlplugin/interfaces.go @@ -116,11 +116,15 @@ type ( // ExecutionsFilter contains the column names within executions table that // can be used to filter results through a WHERE clause + // To get single row, it requires ShardID, DomainID, WorkflowID, RunID + // To get a list of rows, it requires ShardID, Size. + // The WorkflowID and RunID are optional for listing rows. They work as the start boundary for pagination. ExecutionsFilter struct { ShardID int DomainID serialization.UUID WorkflowID string RunID serialization.UUID + Size int } // CurrentExecutionsRow represents a row in current_executions table @@ -581,7 +585,7 @@ type ( InsertIntoExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) UpdateExecutions(ctx context.Context, row *ExecutionsRow) (sql.Result, error) - SelectFromExecutions(ctx context.Context, filter *ExecutionsFilter) (*ExecutionsRow, error) + SelectFromExecutions(ctx context.Context, filter *ExecutionsFilter) ([]ExecutionsRow, error) DeleteFromExecutions(ctx context.Context, filter *ExecutionsFilter) (sql.Result, error) ReadLockExecutions(ctx context.Context, filter *ExecutionsFilter) (int, error) WriteLockExecutions(ctx context.Context, filter *ExecutionsFilter) (int, error) diff --git a/common/persistence/sql/sqlplugin/mysql/execution.go b/common/persistence/sql/sqlplugin/mysql/execution.go index 6e683de5e8a..865833c6e16 100644 --- a/common/persistence/sql/sqlplugin/mysql/execution.go +++ b/common/persistence/sql/sqlplugin/mysql/execution.go @@ -40,6 +40,9 @@ const ( getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions WHERE shard_id = ? AND domain_id = ? AND workflow_id = ? AND run_id = ?` + listExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions + WHERE shard_id = ? AND workflow_id > ? AND run_id > ? ORDER BY workflow_id, run_id LIMIT ?` + deleteExecutionQuery = `DELETE FROM executions WHERE shard_id = ? AND domain_id = ? AND workflow_id = ? AND run_id = ?` @@ -170,13 +173,24 @@ func (mdb *db) UpdateExecutions(ctx context.Context, row *sqlplugin.ExecutionsRo } // SelectFromExecutions reads a single row from executions table -func (mdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (*sqlplugin.ExecutionsRow, error) { - var row sqlplugin.ExecutionsRow - err := mdb.conn.GetContext(ctx, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) - if err != nil { - return nil, err +func (mdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) ([]sqlplugin.ExecutionsRow, error) { + var rows []sqlplugin.ExecutionsRow + var err error + if len(filter.DomainID) == 0 && filter.Size > 0 { + err = mdb.conn.SelectContext(ctx, &rows, listExecutionQuery, filter.ShardID, filter.WorkflowID, filter.RunID, filter.Size) + if err != nil { + return nil, err + } + } else { + var row sqlplugin.ExecutionsRow + err = mdb.conn.GetContext(ctx, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + if err != nil { + return nil, err + } + rows = append(rows, row) } - return &row, err + + return rows, err } // DeleteFromExecutions deletes a single row from executions table diff --git a/common/persistence/sql/sqlplugin/postgres/execution.go b/common/persistence/sql/sqlplugin/postgres/execution.go index f756716d4f2..f37d02e0c82 100644 --- a/common/persistence/sql/sqlplugin/postgres/execution.go +++ b/common/persistence/sql/sqlplugin/postgres/execution.go @@ -40,6 +40,9 @@ const ( getExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` + listExecutionQuery = `SELECT ` + executionsColumns + ` FROM executions + WHERE shard_id = $1 AND workflow_id > $2 AND run_id > $3 ORDER BY workflow_id, run_id LIMIT $4` + deleteExecutionQuery = `DELETE FROM executions WHERE shard_id = $1 AND domain_id = $2 AND workflow_id = $3 AND run_id = $4` @@ -168,13 +171,24 @@ func (pdb *db) UpdateExecutions(ctx context.Context, row *sqlplugin.ExecutionsRo } // SelectFromExecutions reads a single row from executions table -func (pdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) (*sqlplugin.ExecutionsRow, error) { - var row sqlplugin.ExecutionsRow - err := pdb.conn.GetContext(ctx, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) - if err != nil { - return nil, err +func (pdb *db) SelectFromExecutions(ctx context.Context, filter *sqlplugin.ExecutionsFilter) ([]sqlplugin.ExecutionsRow, error) { + var rows []sqlplugin.ExecutionsRow + var err error + if len(filter.DomainID) == 0 && filter.Size > 0 { + err = pdb.conn.SelectContext(ctx, &rows, listExecutionQuery, filter.ShardID, filter.WorkflowID, filter.RunID, filter.Size) + if err != nil { + return nil, err + } + } else { + var row sqlplugin.ExecutionsRow + err = pdb.conn.GetContext(ctx, &row, getExecutionQuery, filter.ShardID, filter.DomainID, filter.WorkflowID, filter.RunID) + if err != nil { + return nil, err + } + rows = append(rows, row) } - return &row, err + + return rows, err } // DeleteFromExecutions deletes a single row from executions table diff --git a/tools/cli/admin.go b/tools/cli/admin.go index 8a324b5209d..187f9dad668 100644 --- a/tools/cli/admin.go +++ b/tools/cli/admin.go @@ -832,6 +832,37 @@ func newDBCommands() []cli.Command { AdminDBScan(c) }, }, + { + Name: "unsupported-workflow", + Usage: "use this command when upgrade the Cadence server from version less than 0.16.0. This scan database and detect unsupported workflow type.", + Flags: append(getDBFlags(), + cli.IntFlag{ + Name: FlagRPS, + Usage: "NumberOfShards for the cadence cluster (see config for numHistoryShards)", + Value: 1000, + }, + cli.StringFlag{ + Name: FlagOutputFilenameWithAlias, + Usage: "Output file to write to, if not provided output is written to stdout", + }, + cli.IntFlag{ + Name: FlagLowerShardBound, + Usage: "FlagLowerShardBound for the start shard to scan. (Default: 0)", + Value: 0, + Required: true, + }, + cli.IntFlag{ + Name: FlagUpperShardBound, + Usage: "FlagLowerShardBound for the end shard to scan. (Default: 16383)", + Value: 16383, + Required: true, + }, + ), + + Action: func(c *cli.Context) { + AdminDBScanUnsupportedWorkflow(c) + }, + }, { Name: "clean", Usage: "clean up corrupted workflows", @@ -853,6 +884,11 @@ func newDBCommands() []cli.Command { // TODO need to support other database: https://github.com/uber/cadence/issues/2777 func getDBFlags() []cli.Flag { return []cli.Flag{ + cli.StringFlag{ + Name: FlagDBType, + Value: "cassandra", + Usage: "persistence type. Current supported options are cassandra, mysql, postgres", + }, cli.StringFlag{ Name: FlagDBAddress, Value: "127.0.0.1", @@ -865,17 +901,34 @@ func getDBFlags() []cli.Flag { }, cli.StringFlag{ Name: FlagUsername, - Usage: "cassandra username", + Usage: "persistence username", }, cli.StringFlag{ Name: FlagPassword, - Usage: "cassandra password", + Usage: "persistence password", }, cli.StringFlag{ Name: FlagKeyspace, Value: "cadence", Usage: "cassandra keyspace", }, + cli.StringFlag{ + Name: FlagDatabaseName, + Value: "cadence", + Usage: "sql database name", + }, + cli.StringFlag{ + Name: FlagEncodingType, + Value: "thriftrw", + Usage: "sql database encoding type", + }, + cli.StringSliceFlag{ + Name: FlagDecodingTypes, + Value: &cli.StringSlice{ + "thriftrw", + }, + Usage: "sql database decoding types", + }, cli.BoolFlag{ Name: FlagEnableTLS, Usage: "enable TLS over cassandra connection", diff --git a/tools/cli/adminCommands.go b/tools/cli/adminCommands.go index 6482c77ef2d..1c0d02abc51 100644 --- a/tools/cli/adminCommands.go +++ b/tools/cli/adminCommands.go @@ -25,6 +25,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "strconv" "time" @@ -38,6 +39,8 @@ import ( "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence" cassp "github.com/uber/cadence/common/persistence/cassandra" + "github.com/uber/cadence/common/persistence/sql" + "github.com/uber/cadence/common/persistence/sql/sqlplugin" "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/thrift" @@ -311,6 +314,44 @@ func connectToCassandra(c *cli.Context) *gocql.Session { return session } +func connectToSQL(c *cli.Context) sqlplugin.DB { + host := getRequiredOption(c, FlagDBAddress) + if !c.IsSet(FlagDBPort) { + ErrorAndExit("sql port is required", nil) + } + encodingType := c.String(FlagEncodingType) + decodingTypesStr := c.StringSlice(FlagDecodingTypes) + + sqlConfig := &config.SQL{ + ConnectAddr: net.JoinHostPort( + host, + c.String(FlagDBPort), + ), + PluginName: c.String(FlagDBType), + User: c.String(FlagUsername), + Password: c.String(FlagPassword), + DatabaseName: getRequiredOption(c, FlagDatabaseName), + EncodingType: encodingType, + DecodingTypes: decodingTypesStr, + } + + if c.Bool(FlagEnableTLS) { + sqlConfig.TLS = &auth.TLS{ + Enabled: true, + CertFile: c.String(FlagTLSCertPath), + KeyFile: c.String(FlagTLSKeyPath), + CaFile: c.String(FlagTLSCaPath), + EnableHostVerification: c.Bool(FlagTLSEnableHostVerification), + } + } + + db, err := sql.NewSQLDB(sqlConfig) + if err != nil { + ErrorAndExit("connect to SQL failed", err) + } + return db +} + // AdminGetDomainIDOrName map domain func AdminGetDomainIDOrName(c *cli.Context) { domainID := c.String(FlagDomainID) diff --git a/tools/cli/adminDBScanCommand.go b/tools/cli/adminDBScanCommand.go index 32f4a30adb3..dc059c937b4 100644 --- a/tools/cli/adminDBScanCommand.go +++ b/tools/cli/adminDBScanCommand.go @@ -23,17 +23,24 @@ package cli import ( + "context" "encoding/json" "fmt" "io" "os" + "time" "github.com/urfave/cli" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/collection" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/loggerimpl" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/cassandra" + "github.com/uber/cadence/common/persistence/serialization" + "github.com/uber/cadence/common/persistence/sql" + "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation/fetcher" "github.com/uber/cadence/common/reconciliation/invariant" "github.com/uber/cadence/common/reconciliation/store" @@ -41,6 +48,10 @@ import ( "github.com/uber/cadence/service/worker/scanner/executions" ) +const ( + listContextTimeout = time.Minute +) + // AdminDBScan is used to scan over executions in database and detect corruptions. func AdminDBScan(c *cli.Context) { scanType, err := executions.ScanTypeString(c.String(FlagScanType)) @@ -49,7 +60,7 @@ func AdminDBScan(c *cli.Context) { ErrorAndExit("unknown scan type", err) } - numberOfShards := c.Int(FlagNumberOfShards) + numberOfShards := getRequiredIntOption(c, FlagNumberOfShards) collectionSlice := c.StringSlice(FlagInvariantCollection) var collections []invariant.Collection @@ -157,3 +168,144 @@ func checkExecution( return execution, invariant.NewInvariantManager(ivs).RunChecks(ctx, execution) } + +// AdminDBScanUnsupportedWorkflow is to scan DB for unsupported workflow for a new release +func AdminDBScanUnsupportedWorkflow(c *cli.Context) { + rps := c.Int(FlagRPS) + outputFile := getOutputFile(c.String(FlagOutputFilename)) + startShardID := c.Int(FlagLowerShardBound) + endShardID := c.Int(FlagUpperShardBound) + + defer outputFile.Close() + for i := startShardID; i <= endShardID; i++ { + listExecutionsByShardID(c, i, rps, outputFile) + fmt.Println(fmt.Sprintf("Shard %v scan operation is completed.", i)) + } +} + +func listExecutionsByShardID( + c *cli.Context, + shardID int, + rps int, + outputFile *os.File, +) { + + client := initializeExecutionStore(c, shardID, rps) + defer client.Close() + + paginationFunc := func(paginationToken []byte) ([]interface{}, []byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), listContextTimeout) + defer cancel() + + resp, err := client.ListConcreteExecutions( + ctx, + &persistence.ListConcreteExecutionsRequest{ + PageSize: 1000, + PageToken: paginationToken, + }, + ) + if err != nil { + return nil, nil, err + } + var paginateItems []interface{} + for _, history := range resp.Executions { + paginateItems = append(paginateItems, history) + } + return paginateItems, resp.PageToken, nil + } + + executionIterator := collection.NewPagingIterator(paginationFunc) + for executionIterator.HasNext() { + result, err := executionIterator.Next() + if err != nil { + ErrorAndExit(fmt.Sprintf("Failed to scan shard ID: %v for unsupported workflow. Please retry.", shardID), err) + } + execution := result.(*persistence.ListConcreteExecutionsEntity) + executionInfo := execution.ExecutionInfo + if executionInfo != nil && executionInfo.CloseStatus == 0 && execution.VersionHistories == nil { + + outStr := fmt.Sprintf("cadence --address : --domain <%v> workflow reset --wid %v --rid %v --reset_type LastDecisionCompleted --reason 'release 0.16 upgrade'\n", + executionInfo.DomainID, + executionInfo.WorkflowID, + executionInfo.RunID, + ) + if _, err = outputFile.WriteString(outStr); err != nil { + ErrorAndExit("Failed to write data to file", err) + } + if err = outputFile.Sync(); err != nil { + ErrorAndExit("Failed to sync data to file", err) + } + } + } +} + +func initializeExecutionStore( + c *cli.Context, + shardID int, + rps int, +) persistence.ExecutionManager { + + var execStore persistence.ExecutionStore + dbType := c.String(FlagDBType) + logger := loggerimpl.NewNopLogger() + switch dbType { + case "cassandra": + execStore = initializeCassandraExecutionClient(c, shardID, logger) + case "mysql": + execStore = initializeSQLExecutionStore(c, shardID, logger) + case "postgres": + execStore = initializeSQLExecutionStore(c, shardID, logger) + default: + ErrorAndExit("The DB type is not supported. Options are: cassandra, mysql, postgres.", nil) + } + + historyManager := persistence.NewExecutionManagerImpl(execStore, logger) + rateLimiter := quotas.NewSimpleRateLimiter(rps) + return persistence.NewWorkflowExecutionPersistenceRateLimitedClient(historyManager, rateLimiter, logger) +} + +func initializeCassandraExecutionClient( + c *cli.Context, + shardID int, + logger log.Logger, +) persistence.ExecutionStore { + + session := connectToCassandra(c) + execStore, err := cassandra.NewWorkflowExecutionPersistence( + shardID, + session, + logger, + ) + if err != nil { + ErrorAndExit("Failed to get execution store from cassandra config", err) + } + return execStore +} + +func initializeSQLExecutionStore( + c *cli.Context, + shardID int, + logger log.Logger, +) persistence.ExecutionStore { + + sqlDB := connectToSQL(c) + encodingType := c.String(FlagEncodingType) + decodingTypesStr := c.StringSlice(FlagDecodingTypes) + var decodingTypes []common.EncodingType + for _, dt := range decodingTypesStr { + decodingTypes = append(decodingTypes, common.EncodingType(dt)) + } + execStore, err := sql.NewSQLExecutionStore(sqlDB, logger, shardID, getSQLParser(common.EncodingType(encodingType), decodingTypes...)) + if err != nil { + ErrorAndExit("Failed to get execution store from cassandra config", err) + } + return execStore +} + +func getSQLParser(encodingType common.EncodingType, decodingTypes ...common.EncodingType) serialization.Parser { + parser, err := serialization.NewParser(encodingType, decodingTypes...) + if err != nil { + ErrorAndExit("failed to initialize sql parser", err) + } + return parser +} diff --git a/tools/cli/adminKafkaCommands.go b/tools/cli/adminKafkaCommands.go index fb2e4cc58a9..6a67077e932 100644 --- a/tools/cli/adminKafkaCommands.go +++ b/tools/cli/adminKafkaCommands.go @@ -175,6 +175,7 @@ func getOutputFile(outputFile string) *os.File { if err != nil { ErrorAndExit("failed to create output file", err) } + return f } diff --git a/tools/cli/app.go b/tools/cli/app.go index 47d8f426c64..1882eb8e45b 100644 --- a/tools/cli/app.go +++ b/tools/cli/app.go @@ -27,7 +27,7 @@ import ( const ( // Version is the controlled version string. It should be updated every time // before we release a new version. - Version = "0.15.0" + Version = "0.16.0" ) // SetFactory is used to set the ClientFactory global diff --git a/tools/cli/flags.go b/tools/cli/flags.go index 7e2dba36189..508441eefd8 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -27,9 +27,13 @@ const ( FlagUsername = "username" FlagPassword = "password" FlagKeyspace = "keyspace" + FlagDatabaseName = "db_name" + FlagEncodingType = "encoding_type" + FlagDecodingTypes = "decoding_types" FlagAddress = "address" FlagAddressWithAlias = FlagAddress + ", ad" FlagHistoryAddress = "history_address" + FlagDBType = "db_type" FlagDBAddress = "db_address" FlagDBPort = "db_port" FlagHistoryAddressWithAlias = FlagHistoryAddress + ", had"