diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go index 3cabb6a93b7..ad3f010c3b1 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go @@ -136,7 +136,7 @@ const ( `}` templateActivityInfoType = `{` + - `version: ?,` + + `version: ?, ` + `schedule_id: ?, ` + `scheduled_event_batch_id: ?, ` + `scheduled_event: ?, ` + @@ -172,7 +172,7 @@ const ( `}` templateTimerInfoType = `{` + - `version: ?,` + + `version: ?, ` + `timer_id: ?, ` + `started_id: ?, ` + `expiry_time: ?, ` + @@ -220,10 +220,10 @@ const ( `}` templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` + - `SET current_run_id = ?, -execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, -workflow_last_write_version = ?, -workflow_state = ? ` + + `SET current_run_id = ?, ` + + `execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, ` + + `workflow_last_write_version = ?, ` + + `workflow_state = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + @@ -333,7 +333,7 @@ workflow_state = ? ` + `IF next_event_id = ? ` templateUpdateActivityInfoQuery = `UPDATE executions ` + - `SET activity_map[ ? ] =` + templateActivityInfoType + ` ` + + `SET activity_map[ ? ] = ` + templateActivityInfoType + ` ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + @@ -343,7 +343,7 @@ workflow_state = ? ` + `and task_id = ? ` templateResetActivityInfoQuery = `UPDATE executions ` + - `SET activity_map = ?` + + `SET activity_map = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + @@ -353,7 +353,7 @@ workflow_state = ? ` + `and task_id = ? ` templateUpdateTimerInfoQuery = `UPDATE executions ` + - `SET timer_map[ ? ] =` + templateTimerInfoType + ` ` + + `SET timer_map[ ? ] = ` + templateTimerInfoType + ` ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + @@ -363,7 +363,7 @@ workflow_state = ? ` + `and task_id = ? ` templateResetTimerInfoQuery = `UPDATE executions ` + - `SET timer_map = ?` + + `SET timer_map = ? ` + `WHERE shard_id = ? ` + `and type = ? ` + `and domain_id = ? ` + diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go index 4010a79758d..97e7eb1c0c8 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils.go @@ -1447,7 +1447,7 @@ func mustConvertToSlice(value interface{}) []interface{} { } return result default: - panic(fmt.Sprintf("Unable to convert %v to slice", value)) + panic(fmt.Sprintf("Unable to convert %v to slice which is of type %T", value, value)) } } diff --git a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go index 5fb565ceb19..8df87c33afc 100644 --- a/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go +++ b/common/persistence/nosql/nosqlplugin/cassandra/workflow_utils_test.go @@ -24,12 +24,14 @@ package cassandra import ( "context" "fmt" + "reflect" "regexp" "strings" "testing" "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql" @@ -75,8 +77,23 @@ type fakeBatch struct { // Query is fake implementation of gocql.Batch.Query func (b *fakeBatch) Query(queryTmpl string, args ...interface{}) { + argsSanitized := make([]interface{}, len(args)) + for i, arg := range args { + // use values instead of pointer so that we can compare them in tests + if reflect.ValueOf(arg).Kind() == reflect.Ptr && !reflect.ValueOf(arg).IsZero() { + argsSanitized[i] = reflect.ValueOf(arg).Elem().Interface() + } else { + argsSanitized[i] = arg + } + + if t, ok := argsSanitized[i].(time.Time); ok { + // use fixed time format to avoid flakiness + argsSanitized[i] = t.UTC().Format(time.RFC3339) + } + + } queryTmpl = strings.ReplaceAll(queryTmpl, "?", "%v") - b.queries = append(b.queries, fmt.Sprintf(queryTmpl, args...)) + b.queries = append(b.queries, fmt.Sprintf(queryTmpl, argsSanitized...)) } // WithContext is fake implementation of gocql.Batch.WithContext @@ -719,14 +736,14 @@ func TestTransferTasks(t *testing.T) { wantQueries: []string{ `INSERT INTO executions (shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` + `VALUES(1000, 2, 10000000-3000-f000-f000-000000000000, 20000000-3000-f000-f000-000000000000, 30000000-3000-f000-f000-000000000000, ` + - `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_1, visibility_ts: 2023-12-12 22:08:41 +0000 UTC, ` + + `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_1, visibility_ts: 2023-12-12T22:08:41Z, ` + `task_id: 355, target_domain_id: e2bf2c8f-0ddf-4451-8840-27cfe8addd62, target_domain_ids: map[],` + `target_workflow_id: 20000000-0000-f000-f000-000000000001, target_run_id: 30000000-0000-f000-f000-000000000002, ` + `target_child_workflow_only: true, task_list: tasklist_1, type: 0, schedule_id: 14, record_visibility: false, version: 1}, ` + `946684800000, 355)`, `INSERT INTO executions (shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` + `VALUES(1000, 2, 10000000-3000-f000-f000-000000000000, 20000000-3000-f000-f000-000000000000, 30000000-3000-f000-f000-000000000000, ` + - `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_2, visibility_ts: 2023-12-12 22:09:41 +0000 UTC, ` + + `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_2, visibility_ts: 2023-12-12T22:09:41Z, ` + `task_id: 220, target_domain_id: e2bf2c8f-0ddf-4451-8840-27cfe8addd62, target_domain_ids: map[],` + `target_workflow_id: 20000000-0000-f000-f000-000000000001, target_run_id: 30000000-0000-f000-f000-000000000002, ` + `target_child_workflow_only: true, task_list: tasklist_2, type: 0, schedule_id: 3, record_visibility: false, version: 1}, ` + @@ -790,7 +807,7 @@ func TestCrossClusterTasks(t *testing.T) { wantQueries: []string{ `INSERT INTO executions (shard_id, type, domain_id, workflow_id, run_id, cross_cluster, visibility_ts, task_id) ` + `VALUES(1000, 6, 10000000-7000-f000-f000-000000000000, , 30000000-7000-f000-f000-000000000000, ` + - `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_1, visibility_ts: 2023-12-12 22:08:41 +0000 UTC, ` + + `{domain_id: domain_xyz, workflow_id: workflow_xyz, run_id: rundid_1, visibility_ts: 2023-12-12T22:08:41Z, ` + `task_id: 355, target_domain_id: e2bf2c8f-0ddf-4451-8840-27cfe8addd62, target_domain_ids: map[],` + `target_workflow_id: 20000000-0000-f000-f000-000000000001, target_run_id: 30000000-0000-f000-f000-000000000002, ` + `target_child_workflow_only: true, task_list: tasklist_1, type: 0, schedule_id: 14, record_visibility: false, version: 1}, ` + @@ -1349,6 +1366,1125 @@ func TestUpdateChildExecutionInfos(t *testing.T) { } } +func TestResetTimerInfos(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-12T22:08:41Z") + if err != nil { + t.Fatal(err) + } + tests := []struct { + desc string + shardID int + domainID string + workflowID string + runID string + timerInfos map[string]*persistence.TimerInfo + // expectations + wantQueries []string + }{ + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + runID: "runid1", + timerInfos: map[string]*persistence.TimerInfo{ + "timer1": { + Version: 1, + TimerID: "timer1", + StartedID: 2, + ExpiryTime: ts.UTC(), + TaskStatus: 1, + }, + }, + wantQueries: []string{ + `UPDATE executions SET timer_map = map[` + + `timer1:map[expiry_time:2023-12-12 22:08:41 +0000 UTC started_id:2 task_id:1 timer_id:timer1 version:1]` + + `] WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := resetTimerInfos(batch, tc.shardID, tc.domainID, tc.workflowID, tc.runID, tc.timerInfos) + if err != nil { + t.Fatalf("resetTimerInfos() error = %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestUpdateTimerInfos(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + + tests := []struct { + desc string + shardID int + domainID string + workflowID string + runID string + timerInfos map[string]*persistence.TimerInfo + deleteInfos []string + // expectations + wantQueries []string + }{ + { + desc: "update and delete timer infos", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + runID: "runid1", + timerInfos: map[string]*persistence.TimerInfo{ + "timer1": { + TimerID: "timer1", + Version: 1, + StartedID: 2, + ExpiryTime: ts.UTC(), + TaskStatus: 1, + }, + }, + deleteInfos: []string{"timer2"}, + wantQueries: []string{ + `UPDATE executions SET timer_map[ timer1 ] = {` + + `version: 1, timer_id: timer1, started_id: 2, expiry_time: 2023-12-19T22:08:41Z, task_id: 1` + + `} WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + `DELETE timer_map[ timer2 ] FROM executions WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := updateTimerInfos(batch, tc.shardID, tc.domainID, tc.workflowID, tc.runID, tc.timerInfos, tc.deleteInfos) + if err != nil { + t.Fatalf("updateTimerInfos() error = %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestResetActivityInfos(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + + tests := []struct { + desc string + shardID int + domainID string + workflowID string + runID string + activityInfos map[int64]*persistence.InternalActivityInfo + // expectations + wantQueries []string + }{ + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + runID: "runid1", + activityInfos: map[int64]*persistence.InternalActivityInfo{ + 1: { + Version: 1, + ScheduledEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-scheduled-event-data"), + }, + ScheduledTime: ts.UTC(), + ScheduleID: 1, + StartedID: 2, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-started-event-data"), + }, + ActivityID: "activity1", + ScheduleToStartTimeout: 1 * time.Minute, + ScheduleToCloseTimeout: 2 * time.Minute, + StartToCloseTimeout: 3 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + Attempt: 3, + MaximumAttempts: 5, + TaskList: "tasklist1", + HasRetryPolicy: true, + LastFailureReason: "retry reason", + }, + 2: { + Version: 1, + ScheduledEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-scheduled-event-data"), + }, + ScheduledTime: ts.UTC(), + ScheduleID: 2, + StartedID: 3, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-started-event-data"), + }, + ActivityID: "activity2", + ScheduleToStartTimeout: 1 * time.Minute, + ScheduleToCloseTimeout: 2 * time.Minute, + StartToCloseTimeout: 3 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + Attempt: 1, + MaximumAttempts: 5, + TaskList: "tasklist1", + HasRetryPolicy: true, + LastFailureReason: "another retry reason", + }, + }, + wantQueries: []string{ + `UPDATE executions SET activity_map = map[` + + `1:map[` + + `activity_id:activity1 attempt:3 backoff_coefficient:0 cancel_request_id:0 cancel_requested:false ` + + `details:[] event_data_encoding:thriftrw expiration_time:0001-01-01 00:00:00 +0000 UTC has_retry_policy:true ` + + `heart_beat_timeout:60 init_interval:0 last_failure_details:[] last_failure_reason:retry reason ` + + `last_hb_updated_time:0001-01-01 00:00:00 +0000 UTC last_worker_identity: max_attempts:5 max_interval:0 ` + + `non_retriable_errors:[] request_id: schedule_id:1 schedule_to_close_timeout:120 schedule_to_start_timeout:60 ` + + `scheduled_event:[116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 99 104 101 100 117 108 101 100 45 101 118 101 110 116 45 100 97 116 97] ` + + `scheduled_event_batch_id:0 scheduled_time:2023-12-19 22:08:41 +0000 UTC start_to_close_timeout:180 ` + + `started_event:[116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 116 97 114 116 101 100 45 101 118 101 110 116 45 100 97 116 97] ` + + `started_id:2 started_identity: started_time:0001-01-01 00:00:00 +0000 UTC task_list:tasklist1 timer_task_status:0 version:1` + + `] ` + + `2:map[` + + `activity_id:activity2 attempt:1 backoff_coefficient:0 cancel_request_id:0 cancel_requested:false ` + + `details:[] event_data_encoding:thriftrw expiration_time:0001-01-01 00:00:00 +0000 UTC has_retry_policy:true ` + + `heart_beat_timeout:60 init_interval:0 last_failure_details:[] last_failure_reason:another retry reason ` + + `last_hb_updated_time:0001-01-01 00:00:00 +0000 UTC last_worker_identity: max_attempts:5 max_interval:0 ` + + `non_retriable_errors:[] request_id: schedule_id:2 schedule_to_close_timeout:120 schedule_to_start_timeout:60 ` + + `scheduled_event:[116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 99 104 101 100 117 108 101 100 45 101 118 101 110 116 45 100 97 116 97] ` + + `scheduled_event_batch_id:0 scheduled_time:2023-12-19 22:08:41 +0000 UTC start_to_close_timeout:180 ` + + `started_event:[116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 116 97 114 116 101 100 45 101 118 101 110 116 45 100 97 116 97] ` + + `started_id:3 started_identity: started_time:0001-01-01 00:00:00 +0000 UTC task_list:tasklist1 timer_task_status:0 version:1` + + `]` + + `] WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := resetActivityInfos(batch, tc.shardID, tc.domainID, tc.workflowID, tc.runID, tc.activityInfos) + if err != nil { + t.Fatalf("resetActivityInfos() error = %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestUpdateActivityInfos(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + + tests := []struct { + desc string + shardID int + domainID string + workflowID string + runID string + activityInfos map[int64]*persistence.InternalActivityInfo + deleteInfos []int64 + // expectations + wantQueries []string + }{ + { + desc: "update and delete activity infos", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + runID: "runid1", + activityInfos: map[int64]*persistence.InternalActivityInfo{ + 1: { + Version: 1, + ScheduledEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-scheduled-event-data"), + }, + ScheduledTime: ts.UTC(), + ScheduleID: 1, + StartedID: 2, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-started-event-data"), + }, + ActivityID: "activity1", + ScheduleToStartTimeout: 1 * time.Minute, + ScheduleToCloseTimeout: 2 * time.Minute, + StartToCloseTimeout: 3 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + Attempt: 3, + MaximumAttempts: 5, + TaskList: "tasklist1", + HasRetryPolicy: true, + LastFailureReason: "retry reason", + }, + }, + deleteInfos: []int64{2}, + wantQueries: []string{ + `UPDATE executions SET activity_map[ 1 ] = {` + + `version: 1, schedule_id: 1, scheduled_event_batch_id: 0, ` + + `scheduled_event: [116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 99 104 101 100 117 108 101 100 45 101 118 101 110 116 45 100 97 116 97], ` + + `scheduled_time: 2023-12-19T22:08:41Z, started_id: 2, ` + + `started_event: [116 104 114 105 102 116 45 101 110 99 111 100 101 100 45 115 116 97 114 116 101 100 45 101 118 101 110 116 45 100 97 116 97], ` + + `started_time: 0001-01-01T00:00:00Z, activity_id: activity1, request_id: , ` + + `details: [], schedule_to_start_timeout: 60, schedule_to_close_timeout: 120, start_to_close_timeout: 180, ` + + `heart_beat_timeout: 60, cancel_requested: false, cancel_request_id: 0, last_hb_updated_time: 0001-01-01T00:00:00Z, ` + + `timer_task_status: 0, attempt: 3, task_list: tasklist1, started_identity: , has_retry_policy: true, ` + + `init_interval: 0, backoff_coefficient: 0, max_interval: 0, expiration_time: 0001-01-01T00:00:00Z, ` + + `max_attempts: 5, non_retriable_errors: [], last_failure_reason: retry reason, last_worker_identity: , ` + + `last_failure_details: [], event_data_encoding: thriftrw` + + `} WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + `DELETE activity_map[ 2 ] FROM executions ` + + `WHERE shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := updateActivityInfos(batch, tc.shardID, tc.domainID, tc.workflowID, tc.runID, tc.activityInfos, tc.deleteInfos) + if err != nil { + t.Fatalf("updateActivityInfos() error = %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestCreateWorkflowExecutionWithMergeMaps(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.WorkflowExecutionRequest + // expectations + wantQueries int + wantErr bool + }{ + { + desc: "EventBufferWriteMode is not None", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeAppend, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantErr: true, + }, + { + desc: "MapsWriteMode is not Create", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeNone, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeUpdate, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantErr: true, + }, + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeNone, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeCreate, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + ActivityInfos: map[int64]*persistence.InternalActivityInfo{ + 1: { + Version: 1, + ScheduledEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-scheduled-event-data"), + }, + ScheduledTime: ts.UTC(), + ScheduleID: 1, + StartedID: 2, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-started-event-data"), + }, + ActivityID: "activity1", + ScheduleToStartTimeout: 1 * time.Minute, + ScheduleToCloseTimeout: 2 * time.Minute, + StartToCloseTimeout: 3 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + Attempt: 3, + MaximumAttempts: 5, + TaskList: "tasklist1", + HasRetryPolicy: true, + LastFailureReason: "retry reason", + }, + }, + TimerInfos: map[string]*persistence.TimerInfo{ + "timer1": { + Version: 1, + TimerID: "timer1", + StartedID: 2, + ExpiryTime: ts, + TaskStatus: 1, + }, + }, + ChildWorkflowInfos: map[int64]*persistence.InternalChildExecutionInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + StartedID: 3, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + }, + StartedWorkflowID: "startedWorkflowID1", + StartedRunID: "startedRunID1", + CreateRequestID: "createRequestID1", + InitiatedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + }, + DomainID: "domain1", + WorkflowTypeName: "workflowType1", + ParentClosePolicy: types.ParentClosePolicyAbandon, + }, + }, + RequestCancelInfos: map[int64]*persistence.RequestCancelInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + CancelRequestID: "cancelRequest1", + }, + }, + SignalInfos: map[int64]*persistence.SignalInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + SignalRequestID: "request1", + SignalName: "signal1", + Input: []byte("input1"), + Control: []byte("control1"), + }, + }, + SignalRequestedIDs: []string{"signalRequestedID1"}, + }, + // expecting 7 queries: + // - 1 for execution record + // - 1 for activity info + // - 1 for timer info + // - 1 for child execution info + // - 1 for request cancel info + // - 1 for signal info + // - 1 for signal requested IDs + wantQueries: 7, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := createWorkflowExecutionWithMergeMaps(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + gotErr := (err != nil) + if gotErr != tc.wantErr { + t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) + } + if gotErr { + return + } + + // actual queries generated by helper functions are covered in other unit tests. check the numer of total queries here. + if got := len(batch.queries); got != tc.wantQueries { + t.Fatalf("len(queries): %v, want: %v", got, tc.wantQueries) + } + }) + } +} + +func TestResetWorkflowExecutionAndMapsAndEventBuffer(t *testing.T) { + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.WorkflowExecutionRequest + // expectations + wantQueries int + wantErr bool + }{ + { + desc: "EventBufferWriteMode is not Clear", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeAppend, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantErr: true, + }, + { + desc: "MapsWriteMode is not Reset", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeUpdate, // Incorrect mode + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantErr: true, + }, + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeReset, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + // expecting 8 queries: + // - 1 for execution record + // - 1 for deletion of buffered events + // - 1 for activity info map reset + // - 1 for timer info map reset + // - 1 for child execution info map reset + // - 1 for request cancel info map reset + // - 1 for signal info map reset + // - 1 for signal requested IDs reset + wantQueries: 8, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := resetWorkflowExecutionAndMapsAndEventBuffer(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + gotErr := (err != nil) + if gotErr != tc.wantErr { + t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) + } + if gotErr { + return + } + + // Check the number of total queries, actual queries generated by helper functions are covered in other unit tests. + if got := len(batch.queries); got != tc.wantQueries { + t.Fatalf("len(queries): %v, want: %v", got, tc.wantQueries) + } + }) + } +} + +func TestUpdateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.WorkflowExecutionRequest + // expectations + wantQueries int + wantErr bool + }{ + { + desc: "MapsWriteMode is not Update", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeCreate, // Incorrect mode + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantErr: true, + }, + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeUpdate, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + ActivityInfos: map[int64]*persistence.InternalActivityInfo{ + 1: { + Version: 1, + ScheduledEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-scheduled-event-data"), + }, + ScheduledTime: ts.UTC(), + ScheduleID: 1, + StartedID: 2, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + Data: []byte("thrift-encoded-started-event-data"), + }, + ActivityID: "activity1", + ScheduleToStartTimeout: 1 * time.Minute, + ScheduleToCloseTimeout: 2 * time.Minute, + StartToCloseTimeout: 3 * time.Minute, + HeartbeatTimeout: 1 * time.Minute, + Attempt: 3, + MaximumAttempts: 5, + TaskList: "tasklist1", + HasRetryPolicy: true, + LastFailureReason: "retry reason", + }, + }, + TimerInfos: map[string]*persistence.TimerInfo{ + "timer1": { + Version: 1, + TimerID: "timer1", + StartedID: 2, + ExpiryTime: ts.UTC(), + TaskStatus: 1, + }, + }, + ChildWorkflowInfos: map[int64]*persistence.InternalChildExecutionInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + StartedID: 3, + StartedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + }, + StartedWorkflowID: "startedWorkflowID1", + StartedRunID: "startedRunID1", + CreateRequestID: "createRequestID1", + InitiatedEvent: &persistence.DataBlob{ + Encoding: common.EncodingTypeThriftRW, + }, + DomainID: "domain1", + WorkflowTypeName: "workflowType1", + ParentClosePolicy: types.ParentClosePolicyAbandon, + }, + }, + RequestCancelInfos: map[int64]*persistence.RequestCancelInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + CancelRequestID: "cancelRequest1", + }, + }, + SignalInfos: map[int64]*persistence.SignalInfo{ + 1: { + Version: 1, + InitiatedID: 1, + InitiatedEventBatchID: 2, + SignalRequestID: "request1", + SignalName: "signal1", + Input: []byte("input1"), + Control: []byte("control1"), + }, + }, + SignalRequestedIDs: []string{"signalRequestedID1"}, + }, + // expecting 8 queries: + // - 1 for execution record + // - 1 for deletion of buffered events + // - 1 for activity info map update + // - 1 for timer info map update + // - 1 for child execution info map update + // - 1 for request cancel info map update + // - 1 for signal info map update + // - 1 for signal requested IDs update + wantQueries: 8, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + gotErr := (err != nil) + if gotErr != tc.wantErr { + t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) + } + if gotErr { + return + } + + // Check the number of total queries, actual queries generated by helper functions are covered in other unit tests. + if got := len(batch.queries); got != tc.wantQueries { + t.Fatalf("len(queries): %v, want: %v", got, tc.wantQueries) + } + }) + } +} + +func TestUpdateWorkflowExecution(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.WorkflowExecutionRequest + // expectations + wantQueries []string + }{ + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeUpdate, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "domain1", + WorkflowID: "workflow1", + RunID: "runid1", + ParentRunID: "parentRunID1", + WorkflowTypeName: "workflowType1", + TaskList: "tasklist1", + StartTimestamp: ts, + LastUpdatedTimestamp: ts.Add(1 * time.Minute), + DecisionScheduleID: 2, + DecisionStartedID: 3, + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + PreviousNextEventIDCondition: common.Int64Ptr(10), + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantQueries: []string{ + `UPDATE executions SET execution = {` + + `domain_id: domain1, workflow_id: workflow1, run_id: runid1, first_run_id: , parent_domain_id: , parent_workflow_id: , ` + + `parent_run_id: parentRunID1, initiated_id: 0, completion_event_batch_id: 0, completion_event: [], ` + + `completion_event_data_encoding: , task_list: tasklist1, workflow_type_name: workflowType1, workflow_timeout: 0, ` + + `decision_task_timeout: 0, execution_context: [], state: 0, close_status: 0, last_first_event_id: 0, last_event_task_id: 0, ` + + `next_event_id: 0, last_processed_event: 0, start_time: 2023-12-19T22:08:41Z, last_updated_time: 2023-12-19T22:09:41Z, ` + + `create_request_id: , signal_count: 0, history_size: 0, decision_version: 0, decision_schedule_id: 2, decision_started_id: 3, ` + + `decision_request_id: , decision_timeout: 0, decision_attempt: 0, decision_timestamp: -6795364578871345152, ` + + `decision_scheduled_timestamp: -6795364578871345152, decision_original_scheduled_timestamp: -6795364578871345152, ` + + `cancel_requested: false, cancel_request_id: , sticky_task_list: , sticky_schedule_to_start_timeout: 0,client_library_version: , ` + + `client_feature_version: , client_impl: , auto_reset_points: [], auto_reset_points_encoding: , attempt: 0, has_retry_policy: false, ` + + `init_interval: 0, backoff_coefficient: 0, max_interval: 0, expiration_time: 0001-01-01T00:00:00Z, max_attempts: 0, ` + + `non_retriable_errors: [], event_store_version: 2, branch_token: [], cron_schedule: , expiration_seconds: 0, search_attributes: map[], ` + + `memo: map[], partition_config: map[] ` + + `}, next_event_id = 0 , version_histories = [] , version_histories_encoding = , checksum = {version: 0, flavor: 0, value: [] }, workflow_last_write_version = 0 , workflow_state = 0 ` + + `WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = runid1 and visibility_ts = 946684800000 and task_id = -10 ` + + `IF next_event_id = 10 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := updateWorkflowExecution(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + if err != nil { + t.Fatalf("updateWorkflowExecution failed, err: %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestCreateWorkflowExecution(t *testing.T) { + ts, err := time.Parse(time.RFC3339, "2023-12-19T22:08:41Z") + if err != nil { + t.Fatal(err) + } + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.WorkflowExecutionRequest + // expectations + wantQueries []string + }{ + { + desc: "ok", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.WorkflowExecutionRequest{ + EventBufferWriteMode: nosqlplugin.EventBufferWriteModeClear, + MapsWriteMode: nosqlplugin.WorkflowExecutionMapsWriteModeUpdate, + InternalWorkflowExecutionInfo: persistence.InternalWorkflowExecutionInfo{ + DomainID: "domain1", + WorkflowID: "workflow1", + RunID: "runid1", + ParentRunID: "parentRunID1", + WorkflowTypeName: "workflowType1", + TaskList: "tasklist1", + StartTimestamp: ts, + LastUpdatedTimestamp: ts.Add(1 * time.Minute), + DecisionScheduleID: 2, + DecisionStartedID: 3, + CompletionEvent: &persistence.DataBlob{}, + AutoResetPoints: &persistence.DataBlob{}, + }, + PreviousNextEventIDCondition: common.Int64Ptr(10), + VersionHistories: &persistence.DataBlob{}, + Checksums: &checksum.Checksum{}, + }, + wantQueries: []string{ + `INSERT INTO executions (shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum, workflow_last_write_version, workflow_state) ` + + `VALUES(1000, domain1, workflow1, runid1, 1, ` + + `{domain_id: domain1, workflow_id: workflow1, run_id: runid1, first_run_id: , parent_domain_id: , parent_workflow_id: , ` + + `parent_run_id: parentRunID1, initiated_id: 0, completion_event_batch_id: 0, completion_event: [], completion_event_data_encoding: , ` + + `task_list: tasklist1, workflow_type_name: workflowType1, workflow_timeout: 0, decision_task_timeout: 0, execution_context: [], state: 0, ` + + `close_status: 0, last_first_event_id: 0, last_event_task_id: 0, next_event_id: 0, last_processed_event: 0, start_time: 2023-12-19T22:08:41Z, ` + + `last_updated_time: 2023-12-19T22:09:41Z, create_request_id: , signal_count: 0, history_size: 0, decision_version: 0, ` + + `decision_schedule_id: 2, decision_started_id: 3, decision_request_id: , decision_timeout: 0, decision_attempt: 0, ` + + `decision_timestamp: -6795364578871345152, decision_scheduled_timestamp: -6795364578871345152, decision_original_scheduled_timestamp: -6795364578871345152, ` + + `cancel_requested: false, cancel_request_id: , sticky_task_list: , sticky_schedule_to_start_timeout: 0,client_library_version: , client_feature_version: , ` + + `client_impl: , auto_reset_points: [], auto_reset_points_encoding: , attempt: 0, has_retry_policy: false, init_interval: 0, ` + + `backoff_coefficient: 0, max_interval: 0, expiration_time: 0001-01-01T00:00:00Z, max_attempts: 0, non_retriable_errors: [], ` + + `event_store_version: 2, branch_token: [], cron_schedule: , expiration_seconds: 0, search_attributes: map[], memo: map[], partition_config: map[] ` + + `}, 0, 946684800000, -10, [], , {version: 0, flavor: 0, value: [] }, 0, 0) IF NOT EXISTS `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := createWorkflowExecution(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + if err != nil { + t.Fatalf("createWorkflowExecution failed, err: %v", err) + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestCreateOrUpdateWorkflowExecution(t *testing.T) { + tests := []struct { + desc string + shardID int + domainID string + workflowID string + execution *nosqlplugin.CurrentWorkflowWriteRequest + // expectations + wantQueries []string + wantErr bool + }{ + { + desc: "unknown write mode", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: 255, // unknown write mode + }, + wantErr: true, + }, + { + desc: "CurrentWorkflowWriteModeNoop", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop, + }, + wantQueries: nil, + }, + { + desc: "CurrentWorkflowWriteModeInsert", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeInsert, + Row: nosqlplugin.CurrentWorkflowRow{ + RunID: "runid1", + CreateRequestID: "createRequestID1", + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + }, + }, + wantQueries: []string{ + `INSERT INTO executions (shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution, workflow_last_write_version, workflow_state) ` + + `VALUES(1000, 1, domain1, workflow1, 30000000-0000-f000-f000-000000000001, 946684800000, -10, runid1, ` + + `{run_id: runid1, create_request_id: createRequestID1, state: 0, close_status: 0}, 0, 0) ` + + `IF NOT EXISTS USING TTL 0 `, + }, + }, + { + desc: "CurrentWorkflowWriteModeUpdate and condition missing", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate, + Row: nosqlplugin.CurrentWorkflowRow{ + RunID: "runid1", + CreateRequestID: "createRequestID1", + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + }, + }, + wantErr: true, + }, + { + desc: "CurrentWorkflowWriteModeUpdate and condition runid missing", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate, + Condition: &nosqlplugin.CurrentWorkflowWriteCondition{ + CurrentRunID: nil, + }, + Row: nosqlplugin.CurrentWorkflowRow{ + RunID: "runid1", + CreateRequestID: "createRequestID1", + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + }, + }, + wantErr: true, + }, + { + desc: "CurrentWorkflowWriteModeUpdate with LastWriteVersion", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate, + Condition: &nosqlplugin.CurrentWorkflowWriteCondition{ + CurrentRunID: common.StringPtr("runid1"), + LastWriteVersion: common.Int64Ptr(1), + }, + Row: nosqlplugin.CurrentWorkflowRow{ + RunID: "runid1", + CreateRequestID: "createRequestID1", + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + }, + }, + wantQueries: []string{ + `UPDATE executions USING TTL 0 SET ` + + `current_run_id = runid1, ` + + `execution = {run_id: runid1, create_request_id: createRequestID1, state: 0, close_status: 0}, ` + + `workflow_last_write_version = 0, workflow_state = 0 ` + + `WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = 30000000-0000-f000-f000-000000000001 and visibility_ts = 946684800000 and task_id = -10 ` + + `IF current_run_id = runid1 `, + }, + }, + { + desc: "CurrentWorkflowWriteModeUpdate", + shardID: 1000, + domainID: "domain1", + workflowID: "workflow1", + execution: &nosqlplugin.CurrentWorkflowWriteRequest{ + WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate, + Condition: &nosqlplugin.CurrentWorkflowWriteCondition{ + CurrentRunID: common.StringPtr("runid1"), + }, + Row: nosqlplugin.CurrentWorkflowRow{ + RunID: "runid1", + CreateRequestID: "createRequestID1", + State: persistence.WorkflowStateCreated, + CloseStatus: persistence.WorkflowCloseStatusNone, + }, + }, + wantQueries: []string{ + `UPDATE executions USING TTL 0 SET ` + + `current_run_id = runid1, ` + + `execution = {run_id: runid1, create_request_id: createRequestID1, state: 0, close_status: 0}, ` + + `workflow_last_write_version = 0, workflow_state = 0 ` + + `WHERE ` + + `shard_id = 1000 and type = 1 and domain_id = domain1 and workflow_id = workflow1 and ` + + `run_id = 30000000-0000-f000-f000-000000000001 and visibility_ts = 946684800000 and task_id = -10 ` + + `IF current_run_id = runid1 `, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + batch := &fakeBatch{} + + err := createOrUpdateCurrentWorkflow(batch, tc.shardID, tc.domainID, tc.workflowID, tc.execution) + gotErr := (err != nil) + if gotErr != tc.wantErr { + t.Fatalf("Got error: %v, want?: %v", err, tc.wantErr) + } + if gotErr { + return + } + + if diff := cmp.Diff(tc.wantQueries, batch.queries); diff != "" { + t.Fatalf("Query mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestMustConvertToSlice(t *testing.T) { + tests := []struct { + desc string + in interface{} + want []interface{} + wantPanic bool + }{ + { + desc: "nil", + in: nil, + wantPanic: true, + }, + { + desc: "empty", + in: []string{}, + want: []interface{}{}, + }, + { + desc: "slice", + in: []string{"a", "b", "c"}, + want: []interface{}{"a", "b", "c"}, + }, + { + desc: "array", + in: [3]string{"a", "b", "c"}, + want: []interface{}{"a", "b", "c"}, + }, + { + desc: "non-slice", + in: "a", + wantPanic: true, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + defer func() { + r := recover() + if (r != nil) != tc.wantPanic { + t.Fatalf("Got panic: %v, want panic?: %v", r, tc.wantPanic) + } + }() + + got := mustConvertToSlice(tc.in) + if diff := cmp.Diff(tc.want, got); diff != "" { + t.Fatalf("Slice mismatch (-want +got):\n%s", diff) + } + }) + } +} + func errDiff(want, got error) string { wantCondFailure, wantOk := want.(*nosqlplugin.WorkflowOperationConditionFailure) gotCondFailure, gotOk := got.(*nosqlplugin.WorkflowOperationConditionFailure)