Skip to content

Commit

Permalink
Support pagination for SQL GetTransfer/VisibilityTasks (#2564)
Browse files Browse the repository at this point in the history
- support pagination for sql GetTransfer/VisibilityTasks
- improve pagination for sql GetTimer/ReplicationTasks
- new ExecutionMutableStateTaskSuite suite
  • Loading branch information
yycptt authored Mar 4, 2022
1 parent 1a89465 commit 441f7e0
Show file tree
Hide file tree
Showing 12 changed files with 522 additions and 72 deletions.
131 changes: 85 additions & 46 deletions common/persistence/sql/execution_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common/persistence"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
Expand All @@ -60,8 +59,8 @@ func (m *sqlExecutionStore) AddHistoryTasks(
}

func (m *sqlExecutionStore) GetHistoryTask(
request *persistence.GetHistoryTaskRequest,
) (*persistence.InternalGetHistoryTaskResponse, error) {
request *p.GetHistoryTaskRequest,
) (*p.InternalGetHistoryTaskResponse, error) {
switch request.TaskCategory.ID() {
case tasks.CategoryIDTransfer:
return m.getTransferTask(request)
Expand All @@ -77,8 +76,8 @@ func (m *sqlExecutionStore) GetHistoryTask(
}

func (m *sqlExecutionStore) GetHistoryTasks(
request *persistence.GetHistoryTasksRequest,
) (*persistence.InternalGetHistoryTasksResponse, error) {
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
switch request.TaskCategory.ID() {
case tasks.CategoryIDTransfer:
return m.getTransferTasks(request)
Expand All @@ -94,7 +93,7 @@ func (m *sqlExecutionStore) GetHistoryTasks(
}

func (m *sqlExecutionStore) CompleteHistoryTask(
request *persistence.CompleteHistoryTaskRequest,
request *p.CompleteHistoryTaskRequest,
) error {
switch request.TaskCategory.ID() {
case tasks.CategoryIDTransfer:
Expand All @@ -111,7 +110,7 @@ func (m *sqlExecutionStore) CompleteHistoryTask(
}

func (m *sqlExecutionStore) RangeCompleteHistoryTasks(
request *persistence.RangeCompleteHistoryTasksRequest,
request *p.RangeCompleteHistoryTasksRequest,
) error {
switch request.TaskCategory.ID() {
case tasks.CategoryIDTransfer:
Expand All @@ -128,8 +127,8 @@ func (m *sqlExecutionStore) RangeCompleteHistoryTasks(
}

func (m *sqlExecutionStore) getTransferTask(
request *persistence.GetHistoryTaskRequest,
) (*persistence.InternalGetHistoryTaskResponse, error) {
request *p.GetHistoryTaskRequest,
) (*p.InternalGetHistoryTaskResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
rows, err := m.Db.SelectFromTransferTasks(ctx, sqlplugin.TransferTasksFilter{
Expand All @@ -148,32 +147,50 @@ func (m *sqlExecutionStore) getTransferTask(
}

transferRow := rows[0]
resp := &persistence.InternalGetHistoryTaskResponse{
Task: *persistence.NewDataBlob(transferRow.Data, transferRow.DataEncoding),
resp := &p.InternalGetHistoryTaskResponse{
Task: *p.NewDataBlob(transferRow.Data, transferRow.DataEncoding),
}
return resp, nil
}

// TODO: pagination
func (m *sqlExecutionStore) getTransferTasks(
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
inclusiveMinTaskID, exclusiveMaxTaskID, err := getImmediateTaskReadRange(request)
if err != nil {
return nil, err
}

rows, err := m.Db.RangeSelectFromTransferTasks(ctx, sqlplugin.TransferTasksRangeFilter{
ShardID: request.ShardID,
InclusiveMinTaskID: request.InclusiveMinTaskKey.TaskID,
ExclusiveMaxTaskID: request.ExclusiveMaxTaskKey.TaskID,
InclusiveMinTaskID: inclusiveMinTaskID,
ExclusiveMaxTaskID: exclusiveMaxTaskID,
PageSize: request.BatchSize,
})
if err != nil {
if err != sql.ErrNoRows {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetTransferTasks operation failed. Select failed. Error: %v", err))
}
}
resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]commonpb.DataBlob, len(rows))}
resp := &p.InternalGetHistoryTasksResponse{
Tasks: make([]commonpb.DataBlob, len(rows)),
}
if len(rows) == 0 {
return resp, nil
}

for i, row := range rows {
resp.Tasks[i] = *persistence.NewDataBlob(row.Data, row.DataEncoding)
resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding)
}
if len(rows) == request.BatchSize {
resp.NextPageToken = getImmediateTaskNextPageToken(
rows[len(rows)-1].TaskID,
exclusiveMaxTaskID,
)
}

return resp, nil
}

Expand Down Expand Up @@ -207,8 +224,8 @@ func (m *sqlExecutionStore) rangeCompleteTransferTasks(
}

func (m *sqlExecutionStore) getTimerTask(
request *persistence.GetHistoryTaskRequest,
) (*persistence.InternalGetHistoryTaskResponse, error) {
request *p.GetHistoryTaskRequest,
) (*p.InternalGetHistoryTaskResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
rows, err := m.Db.SelectFromTimerTasks(ctx, sqlplugin.TimerTasksFilter{
Expand All @@ -228,7 +245,7 @@ func (m *sqlExecutionStore) getTimerTask(
}

timerRow := rows[0]
resp := &persistence.InternalGetHistoryTaskResponse{
resp := &p.InternalGetHistoryTaskResponse{
Task: *p.NewDataBlob(timerRow.Data, timerRow.DataEncoding),
}
return resp, nil
Expand All @@ -251,7 +268,7 @@ func (m *sqlExecutionStore) getTimerTasks(
InclusiveMinVisibilityTimestamp: pageToken.Timestamp,
InclusiveMinTaskID: pageToken.TaskID,
ExclusiveMaxVisibilityTimestamp: request.ExclusiveMaxTaskKey.FireTime,
PageSize: request.BatchSize + 1,
PageSize: request.BatchSize,
})

if err != nil && err != sql.ErrNoRows {
Expand All @@ -263,14 +280,11 @@ func (m *sqlExecutionStore) getTimerTasks(
resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding)
}

// above use page size + 1 for query, so if this check is true
// there is definitely a next page
if len(resp.Tasks) > request.BatchSize {
if len(resp.Tasks) == request.BatchSize {
pageToken = &timerTaskPageToken{
TaskID: rows[request.BatchSize].TaskID,
Timestamp: rows[request.BatchSize].VisibilityTimestamp,
TaskID: rows[request.BatchSize-1].TaskID + 1,
Timestamp: rows[request.BatchSize-1].VisibilityTimestamp,
}
resp.Tasks = resp.Tasks[:request.BatchSize]
nextToken, err := pageToken.serialize()
if err != nil {
return nil, serviceerror.NewInternal(fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err))
Expand Down Expand Up @@ -314,8 +328,8 @@ func (m *sqlExecutionStore) rangeCompleteTimerTasks(
}

func (m *sqlExecutionStore) getReplicationTask(
request *persistence.GetHistoryTaskRequest,
) (*persistence.InternalGetHistoryTaskResponse, error) {
request *p.GetHistoryTaskRequest,
) (*p.InternalGetHistoryTaskResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
rows, err := m.Db.SelectFromReplicationTasks(ctx, sqlplugin.ReplicationTasksFilter{
Expand All @@ -334,7 +348,7 @@ func (m *sqlExecutionStore) getReplicationTask(
}

replicationRow := rows[0]
resp := &persistence.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(replicationRow.Data, replicationRow.DataEncoding)}
resp := &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(replicationRow.Data, replicationRow.DataEncoding)}
return resp, nil
}

Expand All @@ -357,7 +371,7 @@ func (m *sqlExecutionStore) getReplicationTasks(

switch err {
case nil:
return m.populateGetReplicationTasksResponse(rows, request.ExclusiveMaxTaskKey.TaskID)
return m.populateGetReplicationTasksResponse(rows, request.ExclusiveMaxTaskKey.TaskID, request.BatchSize)
case sql.ErrNoRows:
return &p.InternalGetHistoryTasksResponse{}, nil
default:
Expand Down Expand Up @@ -393,6 +407,7 @@ func getImmediateTaskNextPageToken(
func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
rows []sqlplugin.ReplicationTasksRow,
exclusiveMaxTaskID int64,
batchSize int,
) (*p.InternalGetHistoryTasksResponse, error) {
if len(rows) == 0 {
return &p.InternalGetHistoryTasksResponse{}, nil
Expand All @@ -402,19 +417,23 @@ func (m *sqlExecutionStore) populateGetReplicationTasksResponse(
for i, row := range rows {
tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding)
}

return &p.InternalGetHistoryTasksResponse{
Tasks: tasks,
NextPageToken: getImmediateTaskNextPageToken(
var nextPageToken []byte
if len(rows) == batchSize {
nextPageToken = getImmediateTaskNextPageToken(
rows[len(rows)-1].TaskID,
exclusiveMaxTaskID,
),
)
}
return &p.InternalGetHistoryTasksResponse{
Tasks: tasks,
NextPageToken: nextPageToken,
}, nil
}

func (m *sqlExecutionStore) populateGetReplicationDLQTasksResponse(
rows []sqlplugin.ReplicationDLQTasksRow,
exclusiveMaxTaskID int64,
batchSize int,
) (*p.InternalGetHistoryTasksResponse, error) {
if len(rows) == 0 {
return &p.InternalGetHistoryTasksResponse{}, nil
Expand All @@ -425,9 +444,11 @@ func (m *sqlExecutionStore) populateGetReplicationDLQTasksResponse(
tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding)
}
var nextPageToken []byte
lastTaskID := rows[len(rows)-1].TaskID
if lastTaskID+1 < exclusiveMaxTaskID {
nextPageToken = serializePageToken(lastTaskID + 1)
if len(rows) == batchSize {
nextPageToken = getImmediateTaskNextPageToken(
rows[len(rows)-1].TaskID,
exclusiveMaxTaskID,
)
}
return &p.InternalGetHistoryTasksResponse{
Tasks: tasks,
Expand Down Expand Up @@ -513,7 +534,7 @@ func (m *sqlExecutionStore) GetReplicationTasksFromDLQ(

switch err {
case nil:
return m.populateGetReplicationDLQTasksResponse(rows, request.ExclusiveMaxTaskKey.TaskID)
return m.populateGetReplicationDLQTasksResponse(rows, request.ExclusiveMaxTaskKey.TaskID, request.BatchSize)
case sql.ErrNoRows:
return &p.InternalGetHistoryTasksResponse{}, nil
default:
Expand Down Expand Up @@ -553,8 +574,8 @@ func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
}

func (m *sqlExecutionStore) getVisibilityTask(
request *persistence.GetHistoryTaskRequest,
) (*persistence.InternalGetHistoryTaskResponse, error) {
request *p.GetHistoryTaskRequest,
) (*p.InternalGetHistoryTaskResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
rows, err := m.Db.SelectFromVisibilityTasks(ctx, sqlplugin.VisibilityTasksFilter{
Expand All @@ -573,30 +594,48 @@ func (m *sqlExecutionStore) getVisibilityTask(
}

visibilityRow := rows[0]
resp := &persistence.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(visibilityRow.Data, visibilityRow.DataEncoding)}
resp := &p.InternalGetHistoryTaskResponse{Task: *p.NewDataBlob(visibilityRow.Data, visibilityRow.DataEncoding)}
return resp, nil
}

// TODO: pagination
func (m *sqlExecutionStore) getVisibilityTasks(
request *p.GetHistoryTasksRequest,
) (*p.InternalGetHistoryTasksResponse, error) {
ctx, cancel := newExecutionContext()
defer cancel()
inclusiveMinTaskID, exclusiveMaxTaskID, err := getImmediateTaskReadRange(request)
if err != nil {
return nil, err
}

rows, err := m.Db.RangeSelectFromVisibilityTasks(ctx, sqlplugin.VisibilityTasksRangeFilter{
ShardID: request.ShardID,
InclusiveMinTaskID: request.InclusiveMinTaskKey.TaskID,
ExclusiveMaxTaskID: request.ExclusiveMaxTaskKey.TaskID,
InclusiveMinTaskID: inclusiveMinTaskID,
ExclusiveMaxTaskID: exclusiveMaxTaskID,
PageSize: request.BatchSize,
})
if err != nil {
if err != sql.ErrNoRows {
return nil, serviceerror.NewUnavailable(fmt.Sprintf("GetVisibilityTasks operation failed. Select failed. Error: %v", err))
}
}
resp := &p.InternalGetHistoryTasksResponse{Tasks: make([]commonpb.DataBlob, len(rows))}
resp := &p.InternalGetHistoryTasksResponse{
Tasks: make([]commonpb.DataBlob, len(rows)),
}
if len(rows) == 0 {
return resp, nil
}

for i, row := range rows {
resp.Tasks[i] = *p.NewDataBlob(row.Data, row.DataEncoding)
}
if len(rows) == request.BatchSize {
resp.NextPageToken = getImmediateTaskNextPageToken(
rows[len(rows)-1].TaskID,
exclusiveMaxTaskID,
)
}

return resp, nil
}

Expand Down
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/history_transfer_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
ShardID int32
InclusiveMinTaskID int64
ExclusiveMaxTaskID int64
PageSize int
}

// HistoryTransferTask is the SQL persistence interface for history transfer tasks
Expand All @@ -63,6 +64,7 @@ type (
// DeleteFromTransferTasks deletes one rows from transfer_tasks table.
DeleteFromTransferTasks(ctx context.Context, filter TransferTasksFilter) (sql.Result, error)
// RangeDeleteFromTransferTasks deletes one or more rows from transfer_tasks table.
// TransferTasksRangeFilter - {PageSize} will be ignored
RangeDeleteFromTransferTasks(ctx context.Context, filter TransferTasksRangeFilter) (sql.Result, error)
}
)
2 changes: 2 additions & 0 deletions common/persistence/sql/sqlplugin/history_visibility_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
ShardID int32
InclusiveMinTaskID int64
ExclusiveMaxTaskID int64
PageSize int
}

// HistoryVisibilityTask is the SQL persistence interface for history visibility tasks
Expand All @@ -63,6 +64,7 @@ type (
// DeleteFromVisibilityTasks deletes one rows from visibility_tasks table.
DeleteFromVisibilityTasks(ctx context.Context, filter VisibilityTasksFilter) (sql.Result, error)
// RangeDeleteFromVisibilityTasks deletes one or more rows from visibility_tasks table.
// VisibilityTasksRangeFilter - {PageSize} will be ignored
RangeDeleteFromVisibilityTasks(ctx context.Context, filter VisibilityTasksRangeFilter) (sql.Result, error)
}
)
6 changes: 4 additions & 2 deletions common/persistence/sql/sqlplugin/mysql/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ workflow_id = :workflow_id
getTransferTaskQuery = `SELECT task_id, data, data_encoding
FROM transfer_tasks WHERE shard_id = ? AND task_id = ?`
getTransferTasksQuery = `SELECT task_id, data, data_encoding
FROM transfer_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id`
FROM transfer_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id LIMIT ?`

deleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id = ?`
rangeDeleteTransferTaskQuery = `DELETE FROM transfer_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ?`
Expand Down Expand Up @@ -136,7 +136,7 @@ ORDER BY task_id LIMIT ?`
getVisibilityTaskQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = ? AND task_id = ?`
getVisibilityTasksQuery = `SELECT task_id, data, data_encoding
FROM visibility_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id`
FROM visibility_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ? ORDER BY task_id LIMIT ?`

deleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = ? AND task_id = ?`
rangeDeleteVisibilityTaskQuery = `DELETE FROM visibility_tasks WHERE shard_id = ? AND task_id >= ? AND task_id < ?`
Expand Down Expand Up @@ -390,6 +390,7 @@ func (mdb *db) RangeSelectFromTransferTasks(
filter.ShardID,
filter.InclusiveMinTaskID,
filter.ExclusiveMaxTaskID,
filter.PageSize,
); err != nil {
return nil, err
}
Expand Down Expand Up @@ -744,6 +745,7 @@ func (mdb *db) RangeSelectFromVisibilityTasks(
filter.ShardID,
filter.InclusiveMinTaskID,
filter.ExclusiveMaxTaskID,
filter.PageSize,
); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 441f7e0

Please sign in to comment.