Skip to content

Commit

Permalink
Update read DLQ messages API to return raw task info (#3869)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Jan 12, 2021
1 parent 405b334 commit 8b63df7
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 48 deletions.
56 changes: 49 additions & 7 deletions .gen/go/replicator/replicator.go

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions common/types/mapper/thrift/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,10 @@ func FromReadDLQMessagesResponse(t *types.ReadDLQMessagesResponse) *replicator.R
return nil
}
return &replicator.ReadDLQMessagesResponse{
Type: FromDLQType(t.Type),
ReplicationTasks: FromReplicationTaskArray(t.ReplicationTasks),
NextPageToken: t.NextPageToken,
Type: FromDLQType(t.Type),
ReplicationTasks: FromReplicationTaskArray(t.ReplicationTasks),
ReplicationTasksInfo: FromReplicationTaskInfoArray(t.ReplicationTasksInfo),
NextPageToken: t.NextPageToken,
}
}

Expand All @@ -450,9 +451,10 @@ func ToReadDLQMessagesResponse(t *replicator.ReadDLQMessagesResponse) *types.Rea
return nil
}
return &types.ReadDLQMessagesResponse{
Type: ToDLQType(t.Type),
ReplicationTasks: ToReplicationTaskArray(t.ReplicationTasks),
NextPageToken: t.NextPageToken,
Type: ToDLQType(t.Type),
ReplicationTasks: ToReplicationTaskArray(t.ReplicationTasks),
ReplicationTasksInfo: ToReplicationTaskInfoArray(t.ReplicationTasksInfo),
NextPageToken: t.NextPageToken,
}
}

Expand Down
15 changes: 12 additions & 3 deletions common/types/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,10 @@ func (v *ReadDLQMessagesRequest) GetNextPageToken() (o []byte) {

// ReadDLQMessagesResponse is an internal type (TBD...)
type ReadDLQMessagesResponse struct {
Type *DLQType `json:"type,omitempty"`
ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"`
NextPageToken []byte `json:"nextPageToken,omitempty"`
Type *DLQType `json:"type,omitempty"`
ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"`
ReplicationTasksInfo []*ReplicationTaskInfo `json:"replicationTasksInfo,omitempty"`
NextPageToken []byte `json:"nextPageToken,omitempty"`
}

// GetType is an internal getter (TBD...)
Expand All @@ -612,6 +613,14 @@ func (v *ReadDLQMessagesResponse) GetReplicationTasks() (o []*ReplicationTask) {
return
}

// GetReplicationTasksInfo is an internal getter (TBD...)
func (v *ReadDLQMessagesResponse) GetReplicationTasksInfo() (o []*ReplicationTaskInfo) {
if v != nil && v.ReplicationTasksInfo != nil {
return v.ReplicationTasksInfo
}
return
}

// GetNextPageToken is an internal getter (TBD...)
func (v *ReadDLQMessagesResponse) GetNextPageToken() (o []byte) {
if v != nil && v.NextPageToken != nil {
Expand Down
9 changes: 5 additions & 4 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3248,7 +3248,7 @@ func (e *historyEngineImpl) ReadDLQMessages(
request *types.ReadDLQMessagesRequest,
) (*types.ReadDLQMessagesResponse, error) {

tasks, token, err := e.replicationDLQHandler.ReadMessages(
tasks, taskInfo, token, err := e.replicationDLQHandler.ReadMessages(
ctx,
request.GetSourceCluster(),
request.GetInclusiveEndMessageID(),
Expand All @@ -3259,9 +3259,10 @@ func (e *historyEngineImpl) ReadDLQMessages(
return nil, err
}
return &types.ReadDLQMessagesResponse{
Type: request.GetType().Ptr(),
ReplicationTasks: tasks,
NextPageToken: token,
Type: request.GetType().Ptr(),
ReplicationTasks: tasks,
ReplicationTasksInfo: taskInfo,
NextPageToken: token,
}, nil
}

Expand Down
16 changes: 8 additions & 8 deletions service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type (
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*types.ReplicationTask, []byte, error)
) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error)
PurgeMessages(
ctx context.Context,
sourceCluster string,
Expand Down Expand Up @@ -96,7 +96,7 @@ func (r *dlqHandlerImpl) ReadMessages(
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*types.ReplicationTask, []byte, error) {
) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) {

return r.readMessagesWithAckLevel(
ctx,
Expand All @@ -113,7 +113,7 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*types.ReplicationTask, []byte, error) {
) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) {

resp, err := r.shard.GetExecutionManager().GetReplicationTasksFromDLQ(
ctx,
Expand All @@ -128,12 +128,12 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
},
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}

remoteAdminClient := r.shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster)
if remoteAdminClient == nil {
return nil, nil, errInvalidCluster
return nil, nil, nil, errInvalidCluster
}

taskInfo := make([]*types.ReplicationTaskInfo, 0, len(resp.Tasks))
Expand All @@ -159,11 +159,11 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel(
},
)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
}

return response.ReplicationTasks, resp.NextPageToken, nil
return response.ReplicationTasks, taskInfo, resp.NextPageToken, nil
}

func (r *dlqHandlerImpl) PurgeMessages(
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r *dlqHandlerImpl) MergeMessages(
return nil, errInvalidCluster
}

tasks, token, err := r.readMessagesWithAckLevel(
tasks, _, token, err := r.readMessagesWithAckLevel(
ctx,
sourceCluster,
lastMessageID,
Expand Down
9 changes: 5 additions & 4 deletions service/history/replication/dlq_handler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,12 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() {
s.adminClient.EXPECT().
GetDLQReplicationMessages(ctx, gomock.Any()).
Return(&types.GetDLQReplicationMessagesResponse{}, nil)
tasks, token, err := s.messageHandler.ReadMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken)
tasks, info, token, err := s.messageHandler.ReadMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken)
s.NoError(err)
s.Nil(token)
s.Equal(resp.Tasks[0].GetDomainID(), info[0].GetDomainID())
s.Equal(resp.Tasks[0].GetWorkflowID(), info[0].GetWorkflowID())
s.Equal(resp.Tasks[0].GetRunID(), info[0].GetRunID())
s.Nil(tasks)
}

Expand Down
4 changes: 4 additions & 0 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ func newAdminDLQCommands() []cli.Command {
Name: FlagOutputFilenameWithAlias,
Usage: "Output file to write to, if not provided output is written to stdout",
},
cli.BoolFlag{
Name: FlagDLQRawTask,
Usage: "Show DLQ raw task information",
},
},
Action: func(c *cli.Context) {
AdminGetDLQMessages(c)
Expand Down
48 changes: 33 additions & 15 deletions tools/cli/adminDLQCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
package cli

import (
"bufio"
"encoding/json"
"fmt"
"os"
"time"

"github.com/urfave/cli"
Expand Down Expand Up @@ -51,6 +50,8 @@ func AdminGetDLQMessages(c *cli.Context) {
outputFile := getOutputFile(c.String(FlagOutputFilename))
defer outputFile.Close()

showRawTask := c.Bool(FlagDLQRawTask)
var rawTasksInfo []*types.ReplicationTaskInfo
remainingMessageCount := common.EndMessageID
if c.IsSet(FlagMaxMessageCount) {
remainingMessageCount = c.Int64(FlagMaxMessageCount)
Expand All @@ -76,6 +77,10 @@ func AdminGetDLQMessages(c *cli.Context) {
for _, item := range resp.GetReplicationTasks() {
paginateItems = append(paginateItems, item)
}
if showRawTask {
rawTasksInfo = append(rawTasksInfo, resp.GetReplicationTasksInfo()...)
}

return paginateItems, resp.GetNextPageToken(), err
}

Expand All @@ -100,6 +105,31 @@ func AdminGetDLQMessages(c *cli.Context) {
ErrorAndExit("fail to print dlq messages.", err)
}
}

if showRawTask {
_, err := outputFile.WriteString(fmt.Sprintf("#### REPLICATION DLQ RAW TASKS INFO ####\n"))
if err != nil {
ErrorAndExit("fail to print dlq raw tasks.", err)
}
for _, info := range rawTasksInfo {
str, err := json.Marshal(info)
if err != nil {
ErrorAndExit("fail to encode dlq raw tasks.", err)
}

if _, err = outputFile.WriteString(fmt.Sprintf("%v\n", string(str))); err != nil {
ErrorAndExit("fail to print dlq raw tasks.", err)
}
}
} else {
if lastReadMessageID == 0 && len(rawTasksInfo) > 0 {
if _, err := outputFile.WriteString(
fmt.Sprintf("WARN: Received empty replication task but metadata is not empty. Please use %v to show metadata task.\n", FlagDLQRawTask),
); err != nil {
ErrorAndExit("fail to print warning message.", err)
}
}
}
}

// AdminPurgeDLQMessages deletes messages from DLQ
Expand Down Expand Up @@ -156,7 +186,7 @@ func AdminMergeDLQMessages(c *cli.Context) {
for {
response, err := adminClient.MergeDLQMessages(ctx, request)
if err != nil {
fmt.Printf("Failed to merge DLQ message in shard %v with error: %v.\n", shardID, err)
ErrorAndExit(fmt.Sprintf("Failed to merge DLQ message in shard %v.", shardID), err)
}

if response == nil || len(response.NextPageToken) == 0 {
Expand All @@ -181,15 +211,3 @@ func toQueueType(dlqType string) *types.DLQType {
}
return nil
}

func confirmOrExit(message string) {
fmt.Println(message + " (Y/n)")
reader := bufio.NewReader(os.Stdin)
confirm, err := reader.ReadByte()
if err != nil {
panic(err)
}
if confirm != 'Y' {
osExit(0)
}
}
1 change: 1 addition & 0 deletions tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ const (
FlagTLSEnableHostVerification = "tls_enable_host_verification"
FlagDLQType = "dlq_type"
FlagDLQTypeWithAlias = FlagDLQType + ", dt"
FlagDLQRawTask = "dlq_raw_task"
FlagMaxMessageCount = "max_message_count"
FlagMaxMessageCountWithAlias = FlagMaxMessageCount + ", mmc"
FlagLastMessageID = "last_message_id"
Expand Down

0 comments on commit 8b63df7

Please sign in to comment.