diff --git a/.gen/go/replicator/replicator.go b/.gen/go/replicator/replicator.go index e9696fae7f2..4ee0f867f2c 100644 --- a/.gen/go/replicator/replicator.go +++ b/.gen/go/replicator/replicator.go @@ -4243,9 +4243,10 @@ func (v *ReadDLQMessagesRequest) IsSetNextPageToken() bool { } type ReadDLQMessagesResponse struct { - Type *DLQType `json:"type,omitempty"` - ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"` - NextPageToken []byte `json:"nextPageToken,omitempty"` + Type *DLQType `json:"type,omitempty"` + ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"` + NextPageToken []byte `json:"nextPageToken,omitempty"` + ReplicationTasksInfo []*ReplicationTaskInfo `json:"replicationTasksInfo,omitempty"` } // ToWire translates a ReadDLQMessagesResponse struct into a Thrift-level intermediate @@ -4265,7 +4266,7 @@ type ReadDLQMessagesResponse struct { // } func (v *ReadDLQMessagesResponse) ToWire() (wire.Value, error) { var ( - fields [3]wire.Field + fields [4]wire.Field i int = 0 w wire.Value err error @@ -4295,6 +4296,14 @@ func (v *ReadDLQMessagesResponse) ToWire() (wire.Value, error) { fields[i] = wire.Field{ID: 30, Value: w} i++ } + if v.ReplicationTasksInfo != nil { + w, err = wire.NewValueList(_List_ReplicationTaskInfo_ValueList(v.ReplicationTasksInfo)), error(nil) + if err != nil { + return w, err + } + fields[i] = wire.Field{ID: 40, Value: w} + i++ + } return wire.NewValueStruct(wire.Struct{Fields: fields[:i]}), nil } @@ -4346,6 +4355,14 @@ func (v *ReadDLQMessagesResponse) FromWire(w wire.Value) error { return err } + } + case 40: + if field.Value.Type() == wire.TList { + v.ReplicationTasksInfo, err = _List_ReplicationTaskInfo_Read(field.Value.GetList()) + if err != nil { + return err + } + } } } @@ -4360,7 +4377,7 @@ func (v *ReadDLQMessagesResponse) String() string { return "" } - var fields [3]string + var fields [4]string i := 0 if v.Type != nil { fields[i] = fmt.Sprintf("Type: %v", *(v.Type)) @@ -4374,6 +4391,10 @@ func (v *ReadDLQMessagesResponse) String() string { fields[i] = fmt.Sprintf("NextPageToken: %v", v.NextPageToken) i++ } + if v.ReplicationTasksInfo != nil { + fields[i] = fmt.Sprintf("ReplicationTasksInfo: %v", v.ReplicationTasksInfo) + i++ + } return fmt.Sprintf("ReadDLQMessagesResponse{%v}", strings.Join(fields[:i], ", ")) } @@ -4397,6 +4418,9 @@ func (v *ReadDLQMessagesResponse) Equals(rhs *ReadDLQMessagesResponse) bool { if !((v.NextPageToken == nil && rhs.NextPageToken == nil) || (v.NextPageToken != nil && rhs.NextPageToken != nil && bytes.Equal(v.NextPageToken, rhs.NextPageToken))) { return false } + if !((v.ReplicationTasksInfo == nil && rhs.ReplicationTasksInfo == nil) || (v.ReplicationTasksInfo != nil && rhs.ReplicationTasksInfo != nil && _List_ReplicationTaskInfo_Equals(v.ReplicationTasksInfo, rhs.ReplicationTasksInfo))) { + return false + } return true } @@ -4416,6 +4440,9 @@ func (v *ReadDLQMessagesResponse) MarshalLogObject(enc zapcore.ObjectEncoder) (e if v.NextPageToken != nil { enc.AddString("nextPageToken", base64.StdEncoding.EncodeToString(v.NextPageToken)) } + if v.ReplicationTasksInfo != nil { + err = multierr.Append(err, enc.AddArray("replicationTasksInfo", (_List_ReplicationTaskInfo_Zapper)(v.ReplicationTasksInfo))) + } return err } @@ -4464,6 +4491,21 @@ func (v *ReadDLQMessagesResponse) IsSetNextPageToken() bool { return v != nil && v.NextPageToken != nil } +// GetReplicationTasksInfo returns the value of ReplicationTasksInfo if it is set or its +// zero value if it is unset. +func (v *ReadDLQMessagesResponse) GetReplicationTasksInfo() (o []*ReplicationTaskInfo) { + if v != nil && v.ReplicationTasksInfo != nil { + return v.ReplicationTasksInfo + } + + return +} + +// IsSetReplicationTasksInfo returns true if ReplicationTasksInfo is not nil. +func (v *ReadDLQMessagesResponse) IsSetReplicationTasksInfo() bool { + return v != nil && v.ReplicationTasksInfo != nil +} + type ReplicationMessages struct { ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"` LastRetrievedMessageId *int64 `json:"lastRetrievedMessageId,omitempty"` @@ -7307,11 +7349,11 @@ var ThriftModule = &thriftreflect.ThriftModule{ Name: "replicator", Package: "github.com/uber/cadence/.gen/go/replicator", FilePath: "replicator.thrift", - SHA1: "a1386fd6cad734b4337aa7f6c76ac26af92cc9dd", + SHA1: "f3cf74d03a9d51e306e6faf89a07466feaec9846", Includes: []*thriftreflect.ThriftModule{ shared.ThriftModule, }, Raw: rawIDL, } -const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n SyncShardStatus\n SyncActivity\n HistoryMetadata\n HistoryV2\n FailoverMarker\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n 70: optional i64 (js.type = \"Long\") previousFailoverVersion\n}\n\nstruct SyncShardStatusTaskAttributes {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct SyncActivityTaskAttributes {\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") version\n 50: optional i64 (js.type = \"Long\") scheduledId\n 60: optional i64 (js.type = \"Long\") scheduledTime\n 70: optional i64 (js.type = \"Long\") startedId\n 80: optional i64 (js.type = \"Long\") startedTime\n 90: optional i64 (js.type = \"Long\") lastHeartbeatTime\n 100: optional binary details\n 110: optional i32 attempt\n 120: optional string lastFailureReason\n 130: optional string lastWorkerIdentity\n 140: optional binary lastFailureDetails\n 150: optional shared.VersionHistory versionHistory\n}\n\nstruct HistoryTaskV2Attributes {\n 05: optional i64 (js.type = \"Long\") taskId\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional list versionHistoryItems\n 50: optional shared.DataBlob events\n // new run events does not need version history since there is no prior events\n 70: optional shared.DataBlob newRunEvents\n}\n\nstruct FailoverMarkerAttributes{\n\t10: optional string domainID\n\t20: optional i64 (js.type = \"Long\") failoverVersion\n\t30: optional i64 (js.type = \"Long\") creationTime\n}\n\nstruct FailoverMarkers{\n\t10: optional list failoverMarkers\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 11: optional i64 (js.type = \"Long\") sourceTaskId\n 20: optional DomainTaskAttributes domainTaskAttributes\n 40: optional SyncShardStatusTaskAttributes syncShardStatusTaskAttributes\n 50: optional SyncActivityTaskAttributes syncActivityTaskAttributes\n 70: optional HistoryTaskV2Attributes historyTaskV2Attributes\n 80: optional FailoverMarkerAttributes failoverMarkerAttributes\n 90: optional i64 (js.type = \"Long\") creationTime\n}\n\nstruct ReplicationToken {\n 10: optional i32 shardID\n // lastRetrivedMessageId is where the next fetch should begin with\n 20: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n // lastProcessedMessageId is the last messageId that is processed on the passive side.\n // This can be different than lastRetrievedMessageId if passive side supports prefetching messages.\n 30: optional i64 (js.type = \"Long\") lastProcessedMessageId\n}\n\nstruct SyncShardStatus {\n 10: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct ReplicationMessages {\n 10: optional list replicationTasks\n // This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).\n 20: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n 30: optional bool hasMore // Hint for flow control\n 40: optional SyncShardStatus syncShardStatus\n}\n\nstruct ReplicationTaskInfo {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional i16 taskType\n 50: optional i64 (js.type = \"Long\") taskID\n 60: optional i64 (js.type = \"Long\") version\n 70: optional i64 (js.type = \"Long\") firstEventID\n 80: optional i64 (js.type = \"Long\") nextEventID\n 90: optional i64 (js.type = \"Long\") scheduledID\n}\n\nstruct GetReplicationMessagesRequest {\n 10: optional list tokens\n 20: optional string clusterName\n}\n\nstruct GetReplicationMessagesResponse {\n 10: optional map messagesByShard\n}\n\nstruct GetDomainReplicationMessagesRequest {\n // lastRetrievedMessageId is where the next fetch should begin with\n 10: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n // lastProcessedMessageId is the last messageId that is processed on the passive side.\n // This can be different than lastRetrievedMessageId if passive side supports prefetching messages.\n 20: optional i64 (js.type = \"Long\") lastProcessedMessageId\n // clusterName is the name of the pulling cluster\n 30: optional string clusterName\n}\n\nstruct GetDomainReplicationMessagesResponse {\n 10: optional ReplicationMessages messages\n}\n\nstruct GetDLQReplicationMessagesRequest {\n 10: optional list taskInfos\n}\n\nstruct GetDLQReplicationMessagesResponse {\n 10: optional list replicationTasks\n}\n\nenum DLQType {\n Replication,\n Domain,\n}\n\nstruct ReadDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n 50: optional i32 maximumPageSize\n 60: optional binary nextPageToken\n}\n\nstruct ReadDLQMessagesResponse{\n 10: optional DLQType type\n 20: optional list replicationTasks\n 30: optional binary nextPageToken\n}\n\nstruct PurgeDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n}\n\nstruct MergeDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n 50: optional i32 maximumPageSize\n 60: optional binary nextPageToken\n}\n\nstruct MergeDLQMessagesResponse{\n 10: optional binary nextPageToken\n}\n" +const rawIDL = "// Copyright (c) 2017 Uber Technologies, Inc.\n//\n// Permission is hereby granted, free of charge, to any person obtaining a copy\n// of this software and associated documentation files (the \"Software\"), to deal\n// in the Software without restriction, including without limitation the rights\n// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n// copies of the Software, and to permit persons to whom the Software is\n// furnished to do so, subject to the following conditions:\n//\n// The above copyright notice and this permission notice shall be included in\n// all copies or substantial portions of the Software.\n//\n// THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n// THE SOFTWARE.\n\nnamespace java com.uber.cadence.replicator\n\ninclude \"shared.thrift\"\n\nenum ReplicationTaskType {\n Domain\n History\n SyncShardStatus\n SyncActivity\n HistoryMetadata\n HistoryV2\n FailoverMarker\n}\n\nenum DomainOperation {\n Create\n Update\n}\n\nstruct DomainTaskAttributes {\n 05: optional DomainOperation domainOperation\n 10: optional string id\n 20: optional shared.DomainInfo info\n 30: optional shared.DomainConfiguration config\n 40: optional shared.DomainReplicationConfiguration replicationConfig\n 50: optional i64 (js.type = \"Long\") configVersion\n 60: optional i64 (js.type = \"Long\") failoverVersion\n 70: optional i64 (js.type = \"Long\") previousFailoverVersion\n}\n\nstruct SyncShardStatusTaskAttributes {\n 10: optional string sourceCluster\n 20: optional i64 (js.type = \"Long\") shardId\n 30: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct SyncActivityTaskAttributes {\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional i64 (js.type = \"Long\") version\n 50: optional i64 (js.type = \"Long\") scheduledId\n 60: optional i64 (js.type = \"Long\") scheduledTime\n 70: optional i64 (js.type = \"Long\") startedId\n 80: optional i64 (js.type = \"Long\") startedTime\n 90: optional i64 (js.type = \"Long\") lastHeartbeatTime\n 100: optional binary details\n 110: optional i32 attempt\n 120: optional string lastFailureReason\n 130: optional string lastWorkerIdentity\n 140: optional binary lastFailureDetails\n 150: optional shared.VersionHistory versionHistory\n}\n\nstruct HistoryTaskV2Attributes {\n 05: optional i64 (js.type = \"Long\") taskId\n 10: optional string domainId\n 20: optional string workflowId\n 30: optional string runId\n 40: optional list versionHistoryItems\n 50: optional shared.DataBlob events\n // new run events does not need version history since there is no prior events\n 70: optional shared.DataBlob newRunEvents\n}\n\nstruct FailoverMarkerAttributes{\n\t10: optional string domainID\n\t20: optional i64 (js.type = \"Long\") failoverVersion\n\t30: optional i64 (js.type = \"Long\") creationTime\n}\n\nstruct FailoverMarkers{\n\t10: optional list failoverMarkers\n}\n\nstruct ReplicationTask {\n 10: optional ReplicationTaskType taskType\n 11: optional i64 (js.type = \"Long\") sourceTaskId\n 20: optional DomainTaskAttributes domainTaskAttributes\n 40: optional SyncShardStatusTaskAttributes syncShardStatusTaskAttributes\n 50: optional SyncActivityTaskAttributes syncActivityTaskAttributes\n 70: optional HistoryTaskV2Attributes historyTaskV2Attributes\n 80: optional FailoverMarkerAttributes failoverMarkerAttributes\n 90: optional i64 (js.type = \"Long\") creationTime\n}\n\nstruct ReplicationToken {\n 10: optional i32 shardID\n // lastRetrivedMessageId is where the next fetch should begin with\n 20: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n // lastProcessedMessageId is the last messageId that is processed on the passive side.\n // This can be different than lastRetrievedMessageId if passive side supports prefetching messages.\n 30: optional i64 (js.type = \"Long\") lastProcessedMessageId\n}\n\nstruct SyncShardStatus {\n 10: optional i64 (js.type = \"Long\") timestamp\n}\n\nstruct ReplicationMessages {\n 10: optional list replicationTasks\n // This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).\n 20: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n 30: optional bool hasMore // Hint for flow control\n 40: optional SyncShardStatus syncShardStatus\n}\n\nstruct ReplicationTaskInfo {\n 10: optional string domainID\n 20: optional string workflowID\n 30: optional string runID\n 40: optional i16 taskType\n 50: optional i64 (js.type = \"Long\") taskID\n 60: optional i64 (js.type = \"Long\") version\n 70: optional i64 (js.type = \"Long\") firstEventID\n 80: optional i64 (js.type = \"Long\") nextEventID\n 90: optional i64 (js.type = \"Long\") scheduledID\n}\n\nstruct GetReplicationMessagesRequest {\n 10: optional list tokens\n 20: optional string clusterName\n}\n\nstruct GetReplicationMessagesResponse {\n 10: optional map messagesByShard\n}\n\nstruct GetDomainReplicationMessagesRequest {\n // lastRetrievedMessageId is where the next fetch should begin with\n 10: optional i64 (js.type = \"Long\") lastRetrievedMessageId\n // lastProcessedMessageId is the last messageId that is processed on the passive side.\n // This can be different than lastRetrievedMessageId if passive side supports prefetching messages.\n 20: optional i64 (js.type = \"Long\") lastProcessedMessageId\n // clusterName is the name of the pulling cluster\n 30: optional string clusterName\n}\n\nstruct GetDomainReplicationMessagesResponse {\n 10: optional ReplicationMessages messages\n}\n\nstruct GetDLQReplicationMessagesRequest {\n 10: optional list taskInfos\n}\n\nstruct GetDLQReplicationMessagesResponse {\n 10: optional list replicationTasks\n}\n\nenum DLQType {\n Replication,\n Domain,\n}\n\nstruct ReadDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n 50: optional i32 maximumPageSize\n 60: optional binary nextPageToken\n}\n\nstruct ReadDLQMessagesResponse{\n 10: optional DLQType type\n 20: optional list replicationTasks\n 30: optional binary nextPageToken\n 40: optional list replicationTasksInfo\n}\n\nstruct PurgeDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n}\n\nstruct MergeDLQMessagesRequest{\n 10: optional DLQType type\n 20: optional i32 shardID\n 30: optional string sourceCluster\n 40: optional i64 (js.type = \"Long\") inclusiveEndMessageID\n 50: optional i32 maximumPageSize\n 60: optional binary nextPageToken\n}\n\nstruct MergeDLQMessagesResponse{\n 10: optional binary nextPageToken\n}\n" diff --git a/common/types/mapper/thrift/replicator.go b/common/types/mapper/thrift/replicator.go index 8e733f5f3c4..ec0c493b932 100644 --- a/common/types/mapper/thrift/replicator.go +++ b/common/types/mapper/thrift/replicator.go @@ -438,9 +438,10 @@ func FromReadDLQMessagesResponse(t *types.ReadDLQMessagesResponse) *replicator.R return nil } return &replicator.ReadDLQMessagesResponse{ - Type: FromDLQType(t.Type), - ReplicationTasks: FromReplicationTaskArray(t.ReplicationTasks), - NextPageToken: t.NextPageToken, + Type: FromDLQType(t.Type), + ReplicationTasks: FromReplicationTaskArray(t.ReplicationTasks), + ReplicationTasksInfo: FromReplicationTaskInfoArray(t.ReplicationTasksInfo), + NextPageToken: t.NextPageToken, } } @@ -450,9 +451,10 @@ func ToReadDLQMessagesResponse(t *replicator.ReadDLQMessagesResponse) *types.Rea return nil } return &types.ReadDLQMessagesResponse{ - Type: ToDLQType(t.Type), - ReplicationTasks: ToReplicationTaskArray(t.ReplicationTasks), - NextPageToken: t.NextPageToken, + Type: ToDLQType(t.Type), + ReplicationTasks: ToReplicationTaskArray(t.ReplicationTasks), + ReplicationTasksInfo: ToReplicationTaskInfoArray(t.ReplicationTasksInfo), + NextPageToken: t.NextPageToken, } } diff --git a/common/types/replicator.go b/common/types/replicator.go index 5a5f69b4bff..2079b017225 100644 --- a/common/types/replicator.go +++ b/common/types/replicator.go @@ -591,9 +591,10 @@ func (v *ReadDLQMessagesRequest) GetNextPageToken() (o []byte) { // ReadDLQMessagesResponse is an internal type (TBD...) type ReadDLQMessagesResponse struct { - Type *DLQType `json:"type,omitempty"` - ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"` - NextPageToken []byte `json:"nextPageToken,omitempty"` + Type *DLQType `json:"type,omitempty"` + ReplicationTasks []*ReplicationTask `json:"replicationTasks,omitempty"` + ReplicationTasksInfo []*ReplicationTaskInfo `json:"replicationTasksInfo,omitempty"` + NextPageToken []byte `json:"nextPageToken,omitempty"` } // GetType is an internal getter (TBD...) @@ -612,6 +613,14 @@ func (v *ReadDLQMessagesResponse) GetReplicationTasks() (o []*ReplicationTask) { return } +// GetReplicationTasksInfo is an internal getter (TBD...) +func (v *ReadDLQMessagesResponse) GetReplicationTasksInfo() (o []*ReplicationTaskInfo) { + if v != nil && v.ReplicationTasksInfo != nil { + return v.ReplicationTasksInfo + } + return +} + // GetNextPageToken is an internal getter (TBD...) func (v *ReadDLQMessagesResponse) GetNextPageToken() (o []byte) { if v != nil && v.NextPageToken != nil { diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 19907eed0f2..a6d5bf6ea1c 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -3248,7 +3248,7 @@ func (e *historyEngineImpl) ReadDLQMessages( request *types.ReadDLQMessagesRequest, ) (*types.ReadDLQMessagesResponse, error) { - tasks, token, err := e.replicationDLQHandler.ReadMessages( + tasks, taskInfo, token, err := e.replicationDLQHandler.ReadMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), @@ -3259,9 +3259,10 @@ func (e *historyEngineImpl) ReadDLQMessages( return nil, err } return &types.ReadDLQMessagesResponse{ - Type: request.GetType().Ptr(), - ReplicationTasks: tasks, - NextPageToken: token, + Type: request.GetType().Ptr(), + ReplicationTasks: tasks, + ReplicationTasksInfo: taskInfo, + NextPageToken: token, }, nil } diff --git a/service/history/replication/dlq_handler.go b/service/history/replication/dlq_handler.go index 3df5223e289..cd79829b6c7 100644 --- a/service/history/replication/dlq_handler.go +++ b/service/history/replication/dlq_handler.go @@ -49,7 +49,7 @@ type ( lastMessageID int64, pageSize int, pageToken []byte, - ) ([]*types.ReplicationTask, []byte, error) + ) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) PurgeMessages( ctx context.Context, sourceCluster string, @@ -96,7 +96,7 @@ func (r *dlqHandlerImpl) ReadMessages( lastMessageID int64, pageSize int, pageToken []byte, -) ([]*types.ReplicationTask, []byte, error) { +) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) { return r.readMessagesWithAckLevel( ctx, @@ -113,7 +113,7 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel( lastMessageID int64, pageSize int, pageToken []byte, -) ([]*types.ReplicationTask, []byte, error) { +) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) { resp, err := r.shard.GetExecutionManager().GetReplicationTasksFromDLQ( ctx, @@ -128,12 +128,12 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel( }, ) if err != nil { - return nil, nil, err + return nil, nil, nil, err } remoteAdminClient := r.shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster) if remoteAdminClient == nil { - return nil, nil, errInvalidCluster + return nil, nil, nil, errInvalidCluster } taskInfo := make([]*types.ReplicationTaskInfo, 0, len(resp.Tasks)) @@ -159,11 +159,11 @@ func (r *dlqHandlerImpl) readMessagesWithAckLevel( }, ) if err != nil { - return nil, nil, err + return nil, nil, nil, err } } - return response.ReplicationTasks, resp.NextPageToken, nil + return response.ReplicationTasks, taskInfo, resp.NextPageToken, nil } func (r *dlqHandlerImpl) PurgeMessages( @@ -198,7 +198,7 @@ func (r *dlqHandlerImpl) MergeMessages( return nil, errInvalidCluster } - tasks, token, err := r.readMessagesWithAckLevel( + tasks, _, token, err := r.readMessagesWithAckLevel( ctx, sourceCluster, lastMessageID, diff --git a/service/history/replication/dlq_handler_mock.go b/service/history/replication/dlq_handler_mock.go index 148e412fc97..6b7b62bc57e 100644 --- a/service/history/replication/dlq_handler_mock.go +++ b/service/history/replication/dlq_handler_mock.go @@ -59,13 +59,14 @@ func (m *MockDLQHandler) EXPECT() *MockDLQHandlerMockRecorder { } // ReadMessages mocks base method -func (m *MockDLQHandler) ReadMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]*types.ReplicationTask, []byte, error) { +func (m *MockDLQHandler) ReadMessages(ctx context.Context, sourceCluster string, lastMessageID int64, pageSize int, pageToken []byte) ([]*types.ReplicationTask, []*types.ReplicationTaskInfo, []byte, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReadMessages", ctx, sourceCluster, lastMessageID, pageSize, pageToken) ret0, _ := ret[0].([]*types.ReplicationTask) - ret1, _ := ret[1].([]byte) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret1, _ := ret[1].([]*types.ReplicationTaskInfo) + ret2, _ := ret[2].([]byte) + ret3, _ := ret[3].(error) + return ret0, ret1, ret2, ret3 } // ReadMessages indicates an expected call of ReadMessages diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 9f085460c01..0b3c76b1345 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -145,9 +145,12 @@ func (s *dlqHandlerSuite) TestReadMessages_OK() { s.adminClient.EXPECT(). GetDLQReplicationMessages(ctx, gomock.Any()). Return(&types.GetDLQReplicationMessagesResponse{}, nil) - tasks, token, err := s.messageHandler.ReadMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken) + tasks, info, token, err := s.messageHandler.ReadMessages(ctx, s.sourceCluster, lastMessageID, pageSize, pageToken) s.NoError(err) s.Nil(token) + s.Equal(resp.Tasks[0].GetDomainID(), info[0].GetDomainID()) + s.Equal(resp.Tasks[0].GetWorkflowID(), info[0].GetWorkflowID()) + s.Equal(resp.Tasks[0].GetRunID(), info[0].GetRunID()) s.Nil(tasks) } diff --git a/tools/cli/admin.go b/tools/cli/admin.go index ecfb9b9b55e..c3dcb78ae7a 100644 --- a/tools/cli/admin.go +++ b/tools/cli/admin.go @@ -705,6 +705,10 @@ func newAdminDLQCommands() []cli.Command { Name: FlagOutputFilenameWithAlias, Usage: "Output file to write to, if not provided output is written to stdout", }, + cli.BoolFlag{ + Name: FlagDLQRawTask, + Usage: "Show DLQ raw task information", + }, }, Action: func(c *cli.Context) { AdminGetDLQMessages(c) diff --git a/tools/cli/adminDLQCommands.go b/tools/cli/adminDLQCommands.go index 4dd9152126e..a4444209374 100644 --- a/tools/cli/adminDLQCommands.go +++ b/tools/cli/adminDLQCommands.go @@ -21,9 +21,8 @@ package cli import ( - "bufio" + "encoding/json" "fmt" - "os" "time" "github.com/urfave/cli" @@ -51,6 +50,8 @@ func AdminGetDLQMessages(c *cli.Context) { outputFile := getOutputFile(c.String(FlagOutputFilename)) defer outputFile.Close() + showRawTask := c.Bool(FlagDLQRawTask) + var rawTasksInfo []*types.ReplicationTaskInfo remainingMessageCount := common.EndMessageID if c.IsSet(FlagMaxMessageCount) { remainingMessageCount = c.Int64(FlagMaxMessageCount) @@ -76,6 +77,10 @@ func AdminGetDLQMessages(c *cli.Context) { for _, item := range resp.GetReplicationTasks() { paginateItems = append(paginateItems, item) } + if showRawTask { + rawTasksInfo = append(rawTasksInfo, resp.GetReplicationTasksInfo()...) + } + return paginateItems, resp.GetNextPageToken(), err } @@ -100,6 +105,31 @@ func AdminGetDLQMessages(c *cli.Context) { ErrorAndExit("fail to print dlq messages.", err) } } + + if showRawTask { + _, err := outputFile.WriteString(fmt.Sprintf("#### REPLICATION DLQ RAW TASKS INFO ####\n")) + if err != nil { + ErrorAndExit("fail to print dlq raw tasks.", err) + } + for _, info := range rawTasksInfo { + str, err := json.Marshal(info) + if err != nil { + ErrorAndExit("fail to encode dlq raw tasks.", err) + } + + if _, err = outputFile.WriteString(fmt.Sprintf("%v\n", string(str))); err != nil { + ErrorAndExit("fail to print dlq raw tasks.", err) + } + } + } else { + if lastReadMessageID == 0 && len(rawTasksInfo) > 0 { + if _, err := outputFile.WriteString( + fmt.Sprintf("WARN: Received empty replication task but metadata is not empty. Please use %v to show metadata task.\n", FlagDLQRawTask), + ); err != nil { + ErrorAndExit("fail to print warning message.", err) + } + } + } } // AdminPurgeDLQMessages deletes messages from DLQ @@ -156,7 +186,7 @@ func AdminMergeDLQMessages(c *cli.Context) { for { response, err := adminClient.MergeDLQMessages(ctx, request) if err != nil { - fmt.Printf("Failed to merge DLQ message in shard %v with error: %v.\n", shardID, err) + ErrorAndExit(fmt.Sprintf("Failed to merge DLQ message in shard %v.", shardID), err) } if response == nil || len(response.NextPageToken) == 0 { @@ -181,15 +211,3 @@ func toQueueType(dlqType string) *types.DLQType { } return nil } - -func confirmOrExit(message string) { - fmt.Println(message + " (Y/n)") - reader := bufio.NewReader(os.Stdin) - confirm, err := reader.ReadByte() - if err != nil { - panic(err) - } - if confirm != 'Y' { - osExit(0) - } -} diff --git a/tools/cli/flags.go b/tools/cli/flags.go index d6aadcfde6e..76fe1109ad9 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -231,6 +231,7 @@ const ( FlagTLSEnableHostVerification = "tls_enable_host_verification" FlagDLQType = "dlq_type" FlagDLQTypeWithAlias = FlagDLQType + ", dt" + FlagDLQRawTask = "dlq_raw_task" FlagMaxMessageCount = "max_message_count" FlagMaxMessageCountWithAlias = FlagMaxMessageCount + ", mmc" FlagLastMessageID = "last_message_id"