Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix update sticky task list in task store #3761

Merged
merged 3 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 48 additions & 30 deletions common/persistence/cassandra/cassandraTaskPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,23 @@ const (
`and task_id = ? ` +
`IF range_id = ?`

templateUpdateTaskListQueryWithTTL = `INSERT INTO tasks (` +
templateUpdateTaskListQueryWithTTLPart1 = ` INSERT INTO tasks (` +
`domain_id, ` +
`task_list_name, ` +
`task_list_type, ` +
`type, ` +
`task_id, ` +
`range_id, ` +
`task_list ` +
`) VALUES (?, ?, ?, ?, ?, ?, ` + templateTaskListType + `) USING TTL ?`
`task_id ` +
`) VALUES (?, ?, ?, ?, ?) USING TTL ?`

templateUpdateTaskListQueryWithTTLPart2 = `UPDATE tasks USING TTL ? SET ` +
`range_id = ?, ` +
`task_list = ` + templateTaskListType + " " +
`WHERE domain_id = ? ` +
`and task_list_name = ? ` +
`and task_list_type = ? ` +
`and type = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateDeleteTaskListQuery = `DELETE FROM tasks ` +
`WHERE domain_id = ? ` +
Expand Down Expand Up @@ -268,47 +276,57 @@ func (d *cassandraTaskPersistence) UpdateTaskList(
) (*p.UpdateTaskListResponse, error) {
tli := request.TaskListInfo

var applied bool
var err error
previous := make(map[string]interface{})
if tli.Kind == p.TaskListKindSticky { // if task_list is sticky, then update with TTL
query := d.session.Query(templateUpdateTaskListQueryWithTTL,
batch := d.session.NewBatch(gocql.LoggedBatch)
// part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key
batch.Query(templateUpdateTaskListQueryWithTTLPart1,
tli.DomainID,
&tli.Name,
tli.TaskType,
rowTypeTaskList,
taskListTaskID,
stickyTaskListTTL,
)
// part 2 is for CAS and setting TTL for the rest of the columns
batch.Query(templateUpdateTaskListQueryWithTTLPart2,
stickyTaskListTTL,
tli.RangeID,
tli.DomainID,
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
time.Now(),
stickyTaskListTTL,
tli.DomainID,
&tli.Name,
tli.TaskType,
rowTypeTaskList,
taskListTaskID,
tli.RangeID,
)
err := query.Exec()
if err != nil {
return nil, convertCommonErrors(nil, "UpdateTaskList", err)
}
return &p.UpdateTaskListResponse{}, nil
applied, _, err = d.session.MapExecuteBatchCAS(batch, previous)
} else {
query := d.session.Query(templateUpdateTaskListQuery,
tli.RangeID,
tli.DomainID,
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
time.Now(),
tli.DomainID,
&tli.Name,
tli.TaskType,
rowTypeTaskList,
taskListTaskID,
tli.RangeID,
)
applied, err = query.MapScanCAS(previous)
}

query := d.session.Query(templateUpdateTaskListQuery,
tli.RangeID,
tli.DomainID,
&tli.Name,
tli.TaskType,
tli.AckLevel,
tli.Kind,
time.Now(),
tli.DomainID,
&tli.Name,
tli.TaskType,
rowTypeTaskList,
taskListTaskID,
tli.RangeID,
)

previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
return nil, convertCommonErrors(nil, "UpdateTaskList", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,14 +390,14 @@ func (s *MatchingPersistenceSuite) TestLeaseAndUpdateTaskListSticky() {
DomainID: domainID,
Name: taskList,
TaskType: p.TaskListTypeDecision,
RangeID: 2,
RangeID: tli.RangeID,
AckLevel: 0,
Kind: p.TaskListKindSticky,
}
_, err = s.TaskMgr.UpdateTaskList(ctx, &p.UpdateTaskListRequest{
TaskListInfo: taskListInfo,
})
s.NoError(err) // because update with ttl doesn't check rangeID
s.NoError(err)
}

func (s *MatchingPersistenceSuite) deleteAllTaskList() {
Expand Down
30 changes: 1 addition & 29 deletions common/persistence/sql/sqlTaskManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,36 +211,8 @@ func (m *sqlTaskManager) UpdateTaskList(
}
if request.TaskListInfo.Kind == persistence.TaskListKindSticky {
tlInfo.ExpiryTimeNanos = common.Int64Ptr(stickyTaskListExpiry().UnixNano())
blob, err := m.parser.TaskListInfoToBlob(tlInfo)
if err != nil {
return nil, err
}
row := &sqlplugin.TaskListsRow{
ShardID: shardID,
DomainID: domainID,
RangeID: request.TaskListInfo.RangeID,
Name: request.TaskListInfo.Name,
TaskType: int64(request.TaskListInfo.TaskType),
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
if m.db.SupportsTTL() {
if _, err := m.db.ReplaceIntoTaskListsWithTTL(ctx, &sqlplugin.TaskListsRowWithTTL{
TaskListsRow: *row,
TTL: stickyTasksListsTTL,
}); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Failed to make sticky task list. Error: %v", err),
}
}
} else {
if _, err := m.db.ReplaceIntoTaskLists(ctx, row); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Failed to make sticky task list. Error: %v", err),
}
}
}
}

var resp *persistence.UpdateTaskListResponse
blob, err := m.parser.TaskListInfoToBlob(tlInfo)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions common/persistence/sql/sqlplugin/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,13 +555,11 @@ type (
// - {domainID, tasklistName, taskType, taskID}
// to delete multiple rows
// - {domainID, tasklistName, taskType, taskIDLessThanEquals, limit }
// - this will delete upto limit number of tasks less than or equal to the given task id
// - this will delete up to limit number of tasks less than or equal to the given task id
DeleteFromTasks(ctx context.Context, filter *TasksFilter) (sql.Result, error)

InsertIntoTaskLists(ctx context.Context, row *TaskListsRow) (sql.Result, error)
InsertIntoTaskListsWithTTL(ctx context.Context, row *TaskListsRowWithTTL) (sql.Result, error)
ReplaceIntoTaskLists(ctx context.Context, row *TaskListsRow) (sql.Result, error)
ReplaceIntoTaskListsWithTTL(ctx context.Context, row *TaskListsRowWithTTL) (sql.Result, error)
UpdateTaskLists(ctx context.Context, row *TaskListsRow) (sql.Result, error)
UpdateTaskListsWithTTL(ctx context.Context, row *TaskListsRowWithTTL) (sql.Result, error)
// SelectFromTaskLists returns one or more rows from task_lists table
Expand Down
12 changes: 0 additions & 12 deletions common/persistence/sql/sqlplugin/mysql/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ const (
// (default range ID: initialRangeID == 1)
createTaskListQry = `INSERT ` + taskListCreatePart

replaceTaskListQry = `REPLACE ` + taskListCreatePart

updateTaskListQry = `UPDATE task_lists SET
range_id = :range_id,
data = :data,
Expand Down Expand Up @@ -122,11 +120,6 @@ func (mdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskLists
return mdb.conn.NamedExecContext(ctx, createTaskListQry, row)
}

// ReplaceIntoTaskLists replaces one or more rows in task_lists table
func (mdb *db) ReplaceIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
return mdb.conn.NamedExecContext(ctx, replaceTaskListQry, row)
}

// UpdateTaskLists updates a row in task_lists table
func (mdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
return mdb.conn.NamedExecContext(ctx, updateTaskListQry, row)
Expand Down Expand Up @@ -190,11 +183,6 @@ func (mdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskLi
return nil, sqlplugin.ErrTTLNotSupported
}

// ReplaceIntoTaskListsWithTTL is not supported in MySQL
func (mdb *db) ReplaceIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
return nil, sqlplugin.ErrTTLNotSupported
}

// UpdateTaskListsWithTTL is not supported in MySQL
func (mdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
return nil, sqlplugin.ErrTTLNotSupported
Expand Down
16 changes: 0 additions & 16 deletions common/persistence/sql/sqlplugin/postgres/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ const (
// (default range ID: initialRangeID == 1)
createTaskListQry = `INSERT ` + taskListCreatePart

replaceTaskListQry = `INSERT ` + taskListCreatePart +
`ON CONFLICT (shard_id, domain_id, name, task_type) DO UPDATE
SET range_id = excluded.range_id,
data = excluded.data,
data_encoding = excluded.data_encoding`

updateTaskListQry = `UPDATE task_lists SET
range_id = :range_id,
data = :data,
Expand Down Expand Up @@ -127,11 +121,6 @@ func (pdb *db) InsertIntoTaskLists(ctx context.Context, row *sqlplugin.TaskLists
return pdb.conn.NamedExecContext(ctx, createTaskListQry, row)
}

// ReplaceIntoTaskLists replaces one or more rows in task_lists table
func (pdb *db) ReplaceIntoTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
return pdb.conn.NamedExecContext(ctx, replaceTaskListQry, row)
}

// UpdateTaskLists updates a row in task_lists table
func (pdb *db) UpdateTaskLists(ctx context.Context, row *sqlplugin.TaskListsRow) (sql.Result, error) {
return pdb.conn.NamedExecContext(ctx, updateTaskListQry, row)
Expand Down Expand Up @@ -195,11 +184,6 @@ func (pdb *db) InsertIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskLi
return nil, sqlplugin.ErrTTLNotSupported
}

// ReplaceIntoTaskListsWithTTL is not supported in Postgres
func (pdb *db) ReplaceIntoTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
return nil, sqlplugin.ErrTTLNotSupported
}

// UpdateTaskListsWithTTL is not supported in Postgres
func (pdb *db) UpdateTaskListsWithTTL(_ context.Context, _ *sqlplugin.TaskListsRowWithTTL) (sql.Result, error) {
return nil, sqlplugin.ErrTTLNotSupported
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/visibilitySamplingClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
numOfPriorityForList = 1
)

// ErrPersistenceLimitExceededForList is the error indicating QPS limit reached for list visibility.
// ErrPersistenceLimitExceededForList is the error indicating QPS limit reached for list visibility.
var ErrPersistenceLimitExceededForList = &workflow.ServiceBusyError{Message: "Persistence Max QPS Reached for List Operations."}

type visibilitySamplingClient struct {
Expand Down
2 changes: 1 addition & 1 deletion common/service/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *Collection) logValue(
c.logger.Info("Dynamic config has changed",
tag.Key(key.String()), tag.Value(value), tag.DefaultValue(loadedValue))
// update the logKeys so that we can capture the changes again
// (ignore the racing condition here because it's just for logging, we need a lock if really need to solve it)
// (ignore the racing condition here because it's just for logging, we need a lock if really need to solve it)
c.logKeys.Store(key, value)
}
}
Expand Down