From de54ab6c6d438a0470959c1fadefad23e601028b Mon Sep 17 00:00:00 2001 From: Vytautas Date: Mon, 26 Oct 2020 12:20:51 +0200 Subject: [PATCH] Separate thrift handler for history service (#3646) This is purely a mechanical refactoring that introduces additional ThriftHandler wrapper. For now it only forwards request to the original handler. Later on this will become a point for conversion between internal types and thrift types. Underlying handler will be independent of rpc protocol types. - Created Handler interface for history service - Previous Handler struct renamed to handlerImpl - Created ThriftHandler wrapper on top of Handler interface. At this point it only forward requests as is to underlying handler. - Created Mock for Handler interface. - Covered ThriftHandler with units tests. Test that requests are forwarded to underlying handler and response is returned as is. - Replaced handler registration. Instead of registering handlerImpl directly, wrap it with ThriftHandler and register it instead. --- service/history/handler.go | 163 ++++--- service/history/handler_mock.go | 643 ++++++++++++++++++++++++++ service/history/service.go | 6 +- service/history/thriftHandler.go | 249 ++++++++++ service/history/thriftHandler_test.go | 264 +++++++++++ 5 files changed, 1261 insertions(+), 64 deletions(-) create mode 100644 service/history/handler_mock.go create mode 100644 service/history/thriftHandler.go create mode 100644 service/history/thriftHandler_test.go diff --git a/service/history/handler.go b/service/history/handler.go index 0e339245956..1cf28992dfe 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -31,7 +31,6 @@ import ( "go.uber.org/yarpc/yarpcerrors" "github.com/uber/cadence/.gen/go/health" - "github.com/uber/cadence/.gen/go/health/metaserver" hist "github.com/uber/cadence/.gen/go/history" "github.com/uber/cadence/.gen/go/history/historyserviceserver" r "github.com/uber/cadence/.gen/go/replicator" @@ -53,9 +52,55 @@ import ( "github.com/uber/cadence/service/history/task" ) -// Handler - Thrift handler interface for history service type ( - Handler struct { + //go:generate mockgen -copyright_file=../../LICENSE -package $GOPACKAGE -source $GOFILE -destination handler_mock.go -package history github.com/uber/cadence/service/history Handler + + // Handler interface for history service + Handler interface { + Health(context.Context) (*health.HealthStatus, error) + CloseShard(context.Context, *gen.CloseShardRequest) error + DescribeHistoryHost(context.Context, *gen.DescribeHistoryHostRequest) (*gen.DescribeHistoryHostResponse, error) + DescribeMutableState(context.Context, *hist.DescribeMutableStateRequest) (*hist.DescribeMutableStateResponse, error) + DescribeQueue(context.Context, *gen.DescribeQueueRequest) (*gen.DescribeQueueResponse, error) + DescribeWorkflowExecution(context.Context, *hist.DescribeWorkflowExecutionRequest) (*gen.DescribeWorkflowExecutionResponse, error) + GetDLQReplicationMessages(context.Context, *r.GetDLQReplicationMessagesRequest) (*r.GetDLQReplicationMessagesResponse, error) + GetMutableState(context.Context, *hist.GetMutableStateRequest) (*hist.GetMutableStateResponse, error) + GetReplicationMessages(context.Context, *r.GetReplicationMessagesRequest) (*r.GetReplicationMessagesResponse, error) + MergeDLQMessages(context.Context, *r.MergeDLQMessagesRequest) (*r.MergeDLQMessagesResponse, error) + NotifyFailoverMarkers(context.Context, *hist.NotifyFailoverMarkersRequest) error + PollMutableState(context.Context, *hist.PollMutableStateRequest) (*hist.PollMutableStateResponse, error) + PurgeDLQMessages(context.Context, *r.PurgeDLQMessagesRequest) error + QueryWorkflow(context.Context, *hist.QueryWorkflowRequest) (*hist.QueryWorkflowResponse, error) + ReadDLQMessages(context.Context, *r.ReadDLQMessagesRequest) (*r.ReadDLQMessagesResponse, error) + ReapplyEvents(context.Context, *hist.ReapplyEventsRequest) error + RecordActivityTaskHeartbeat(context.Context, *hist.RecordActivityTaskHeartbeatRequest) (*gen.RecordActivityTaskHeartbeatResponse, error) + RecordActivityTaskStarted(context.Context, *hist.RecordActivityTaskStartedRequest) (*hist.RecordActivityTaskStartedResponse, error) + RecordChildExecutionCompleted(context.Context, *hist.RecordChildExecutionCompletedRequest) error + RecordDecisionTaskStarted(context.Context, *hist.RecordDecisionTaskStartedRequest) (*hist.RecordDecisionTaskStartedResponse, error) + RefreshWorkflowTasks(context.Context, *hist.RefreshWorkflowTasksRequest) error + RemoveSignalMutableState(context.Context, *hist.RemoveSignalMutableStateRequest) error + RemoveTask(context.Context, *gen.RemoveTaskRequest) error + ReplicateEventsV2(context.Context, *hist.ReplicateEventsV2Request) error + RequestCancelWorkflowExecution(context.Context, *hist.RequestCancelWorkflowExecutionRequest) error + ResetQueue(context.Context, *gen.ResetQueueRequest) error + ResetStickyTaskList(context.Context, *hist.ResetStickyTaskListRequest) (*hist.ResetStickyTaskListResponse, error) + ResetWorkflowExecution(context.Context, *hist.ResetWorkflowExecutionRequest) (*gen.ResetWorkflowExecutionResponse, error) + RespondActivityTaskCanceled(context.Context, *hist.RespondActivityTaskCanceledRequest) error + RespondActivityTaskCompleted(context.Context, *hist.RespondActivityTaskCompletedRequest) error + RespondActivityTaskFailed(context.Context, *hist.RespondActivityTaskFailedRequest) error + RespondDecisionTaskCompleted(context.Context, *hist.RespondDecisionTaskCompletedRequest) (*hist.RespondDecisionTaskCompletedResponse, error) + RespondDecisionTaskFailed(context.Context, *hist.RespondDecisionTaskFailedRequest) error + ScheduleDecisionTask(context.Context, *hist.ScheduleDecisionTaskRequest) error + SignalWithStartWorkflowExecution(context.Context, *hist.SignalWithStartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error) + SignalWorkflowExecution(context.Context, *hist.SignalWorkflowExecutionRequest) error + StartWorkflowExecution(context.Context, *hist.StartWorkflowExecutionRequest) (*gen.StartWorkflowExecutionResponse, error) + SyncActivity(context.Context, *hist.SyncActivityRequest) error + SyncShardStatus(context.Context, *hist.SyncShardStatusRequest) error + TerminateWorkflowExecution(context.Context, *hist.TerminateWorkflowExecutionRequest) error + } + + // handlerImpl is an implementation for history service independent of wire protocol + handlerImpl struct { resource.Resource shuttingDown int32 @@ -71,8 +116,8 @@ type ( } ) -var _ historyserviceserver.Interface = (*Handler)(nil) -var _ shard.EngineFactory = (*Handler)(nil) +var _ historyserviceserver.Interface = (*handlerImpl)(nil) +var _ shard.EngineFactory = (*handlerImpl)(nil) var ( errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."} @@ -92,8 +137,8 @@ var ( func NewHandler( resource resource.Resource, config *config.Config, -) *Handler { - handler := &Handler{ +) *handlerImpl { + handler := &handlerImpl{ Resource: resource, config: config, tokenSerializer: common.NewJSONTaskTokenSerializer(), @@ -109,14 +154,8 @@ func NewHandler( return handler } -// RegisterHandler register this handler, must be called before Start() -func (h *Handler) RegisterHandler() { - h.GetDispatcher().Register(historyserviceserver.New(h)) - h.GetDispatcher().Register(metaserver.New(h)) -} - // Start starts the handler -func (h *Handler) Start() { +func (h *handlerImpl) Start() { h.replicationTaskFetchers = replication.NewTaskFetchers( h.GetLogger(), @@ -176,7 +215,7 @@ func (h *Handler) Start() { } // Stop stops the handler -func (h *Handler) Stop() { +func (h *handlerImpl) Stop() { h.PrepareToStop() h.replicationTaskFetchers.Stop() if h.queueTaskProcessor != nil { @@ -188,16 +227,16 @@ func (h *Handler) Stop() { } // PrepareToStop starts graceful traffic drain in preparation for shutdown -func (h *Handler) PrepareToStop() { +func (h *handlerImpl) PrepareToStop() { atomic.StoreInt32(&h.shuttingDown, 1) } -func (h *Handler) isShuttingDown() bool { +func (h *handlerImpl) isShuttingDown() bool { return atomic.LoadInt32(&h.shuttingDown) != 0 } // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard -func (h *Handler) CreateEngine( +func (h *handlerImpl) CreateEngine( shardContext shard.Context, ) engine.Engine { return NewEngineWithShardContext( @@ -216,7 +255,7 @@ func (h *Handler) CreateEngine( } // Health is for health check -func (h *Handler) Health(ctx context.Context) (*health.HealthStatus, error) { +func (h *handlerImpl) Health(ctx context.Context) (*health.HealthStatus, error) { h.startWG.Wait() h.GetLogger().Debug("History health check endpoint reached.") hs := &health.HealthStatus{Ok: true, Msg: common.StringPtr("OK")} @@ -224,7 +263,7 @@ func (h *Handler) Health(ctx context.Context) (*health.HealthStatus, error) { } // RecordActivityTaskHeartbeat - Record Activity Task Heart beat. -func (h *Handler) RecordActivityTaskHeartbeat( +func (h *handlerImpl) RecordActivityTaskHeartbeat( ctx context.Context, wrappedRequest *hist.RecordActivityTaskHeartbeatRequest, ) (resp *gen.RecordActivityTaskHeartbeatResponse, retError error) { @@ -273,7 +312,7 @@ func (h *Handler) RecordActivityTaskHeartbeat( } // RecordActivityTaskStarted - Record Activity Task started. -func (h *Handler) RecordActivityTaskStarted( +func (h *handlerImpl) RecordActivityTaskStarted( ctx context.Context, recordRequest *hist.RecordActivityTaskStartedRequest, ) (resp *hist.RecordActivityTaskStartedResponse, retError error) { @@ -311,7 +350,7 @@ func (h *Handler) RecordActivityTaskStarted( } // RecordDecisionTaskStarted - Record Decision Task started. -func (h *Handler) RecordDecisionTaskStarted( +func (h *handlerImpl) RecordDecisionTaskStarted( ctx context.Context, recordRequest *hist.RecordDecisionTaskStartedRequest, ) (resp *hist.RecordDecisionTaskStartedResponse, retError error) { @@ -364,7 +403,7 @@ func (h *Handler) RecordDecisionTaskStarted( } // RespondActivityTaskCompleted - records completion of an activity task -func (h *Handler) RespondActivityTaskCompleted( +func (h *handlerImpl) RespondActivityTaskCompleted( ctx context.Context, wrappedRequest *hist.RespondActivityTaskCompletedRequest, ) (retError error) { @@ -413,7 +452,7 @@ func (h *Handler) RespondActivityTaskCompleted( } // RespondActivityTaskFailed - records failure of an activity task -func (h *Handler) RespondActivityTaskFailed( +func (h *handlerImpl) RespondActivityTaskFailed( ctx context.Context, wrappedRequest *hist.RespondActivityTaskFailedRequest, ) (retError error) { @@ -462,7 +501,7 @@ func (h *Handler) RespondActivityTaskFailed( } // RespondActivityTaskCanceled - records failure of an activity task -func (h *Handler) RespondActivityTaskCanceled( +func (h *handlerImpl) RespondActivityTaskCanceled( ctx context.Context, wrappedRequest *hist.RespondActivityTaskCanceledRequest, ) (retError error) { @@ -511,7 +550,7 @@ func (h *Handler) RespondActivityTaskCanceled( } // RespondDecisionTaskCompleted - records completion of a decision task -func (h *Handler) RespondDecisionTaskCompleted( +func (h *handlerImpl) RespondDecisionTaskCompleted( ctx context.Context, wrappedRequest *hist.RespondDecisionTaskCompletedRequest, ) (resp *hist.RespondDecisionTaskCompletedResponse, retError error) { @@ -569,7 +608,7 @@ func (h *Handler) RespondDecisionTaskCompleted( } // RespondDecisionTaskFailed - failed response to decision task -func (h *Handler) RespondDecisionTaskFailed( +func (h *handlerImpl) RespondDecisionTaskFailed( ctx context.Context, wrappedRequest *hist.RespondDecisionTaskFailedRequest, ) (retError error) { @@ -637,7 +676,7 @@ func (h *Handler) RespondDecisionTaskFailed( } // StartWorkflowExecution - creates a new workflow execution -func (h *Handler) StartWorkflowExecution( +func (h *handlerImpl) StartWorkflowExecution( ctx context.Context, wrappedRequest *hist.StartWorkflowExecutionRequest, ) (resp *gen.StartWorkflowExecutionResponse, retError error) { @@ -675,7 +714,7 @@ func (h *Handler) StartWorkflowExecution( } // DescribeHistoryHost returns information about the internal states of a history host -func (h *Handler) DescribeHistoryHost( +func (h *handlerImpl) DescribeHistoryHost( ctx context.Context, request *gen.DescribeHistoryHostRequest, ) (resp *gen.DescribeHistoryHostResponse, retError error) { @@ -708,7 +747,7 @@ func (h *Handler) DescribeHistoryHost( } // RemoveTask returns information about the internal states of a history host -func (h *Handler) RemoveTask( +func (h *handlerImpl) RemoveTask( ctx context.Context, request *gen.RemoveTaskRequest, ) (retError error) { @@ -737,7 +776,7 @@ func (h *Handler) RemoveTask( } // CloseShard closes a shard hosted by this instance -func (h *Handler) CloseShard( +func (h *handlerImpl) CloseShard( ctx context.Context, request *gen.CloseShardRequest, ) (retError error) { @@ -746,7 +785,7 @@ func (h *Handler) CloseShard( } // ResetQueue resets processing queue states -func (h *Handler) ResetQueue( +func (h *handlerImpl) ResetQueue( ctx context.Context, request *gen.ResetQueueRequest, ) (retError error) { @@ -780,7 +819,7 @@ func (h *Handler) ResetQueue( } // DescribeQueue describes processing queue states -func (h *Handler) DescribeQueue( +func (h *handlerImpl) DescribeQueue( ctx context.Context, request *gen.DescribeQueueRequest, ) (resp *gen.DescribeQueueResponse, retError error) { @@ -814,7 +853,7 @@ func (h *Handler) DescribeQueue( } // DescribeMutableState - returns the internal analysis of workflow execution state -func (h *Handler) DescribeMutableState( +func (h *handlerImpl) DescribeMutableState( ctx context.Context, request *hist.DescribeMutableStateRequest, ) (resp *hist.DescribeMutableStateResponse, retError error) { @@ -847,7 +886,7 @@ func (h *Handler) DescribeMutableState( } // GetMutableState - returns the id of the next event in the execution's history -func (h *Handler) GetMutableState( +func (h *handlerImpl) GetMutableState( ctx context.Context, getRequest *hist.GetMutableStateRequest, ) (resp *hist.GetMutableStateResponse, retError error) { @@ -884,7 +923,7 @@ func (h *Handler) GetMutableState( } // PollMutableState - returns the id of the next event in the execution's history -func (h *Handler) PollMutableState( +func (h *handlerImpl) PollMutableState( ctx context.Context, getRequest *hist.PollMutableStateRequest, ) (resp *hist.PollMutableStateResponse, retError error) { @@ -921,7 +960,7 @@ func (h *Handler) PollMutableState( } // DescribeWorkflowExecution returns information about the specified workflow execution. -func (h *Handler) DescribeWorkflowExecution( +func (h *handlerImpl) DescribeWorkflowExecution( ctx context.Context, request *hist.DescribeWorkflowExecutionRequest, ) (resp *gen.DescribeWorkflowExecutionResponse, retError error) { @@ -958,7 +997,7 @@ func (h *Handler) DescribeWorkflowExecution( } // RequestCancelWorkflowExecution - requests cancellation of a workflow -func (h *Handler) RequestCancelWorkflowExecution( +func (h *handlerImpl) RequestCancelWorkflowExecution( ctx context.Context, request *hist.RequestCancelWorkflowExecutionRequest, ) (retError error) { @@ -1007,7 +1046,7 @@ func (h *Handler) RequestCancelWorkflowExecution( // SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in // WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution. -func (h *Handler) SignalWorkflowExecution( +func (h *handlerImpl) SignalWorkflowExecution( ctx context.Context, wrappedRequest *hist.SignalWorkflowExecutionRequest, ) (retError error) { @@ -1053,7 +1092,7 @@ func (h *Handler) SignalWorkflowExecution( // and a decision task being created for the execution. // If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled // event recorded in history, and a decision task being created for the execution -func (h *Handler) SignalWithStartWorkflowExecution( +func (h *handlerImpl) SignalWithStartWorkflowExecution( ctx context.Context, wrappedRequest *hist.SignalWithStartWorkflowExecutionRequest, ) (resp *gen.StartWorkflowExecutionResponse, retError error) { @@ -1096,7 +1135,7 @@ func (h *Handler) SignalWithStartWorkflowExecution( // RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently // used to clean execution info when signal decision finished. -func (h *Handler) RemoveSignalMutableState( +func (h *handlerImpl) RemoveSignalMutableState( ctx context.Context, wrappedRequest *hist.RemoveSignalMutableStateRequest, ) (retError error) { @@ -1139,7 +1178,7 @@ func (h *Handler) RemoveSignalMutableState( // TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event // in the history and immediately terminating the execution instance. -func (h *Handler) TerminateWorkflowExecution( +func (h *handlerImpl) TerminateWorkflowExecution( ctx context.Context, wrappedRequest *hist.TerminateWorkflowExecutionRequest, ) (retError error) { @@ -1182,7 +1221,7 @@ func (h *Handler) TerminateWorkflowExecution( // ResetWorkflowExecution reset an existing workflow execution // in the history and immediately terminating the execution instance. -func (h *Handler) ResetWorkflowExecution( +func (h *handlerImpl) ResetWorkflowExecution( ctx context.Context, wrappedRequest *hist.ResetWorkflowExecutionRequest, ) (resp *gen.ResetWorkflowExecutionResponse, retError error) { @@ -1224,7 +1263,7 @@ func (h *Handler) ResetWorkflowExecution( } // QueryWorkflow queries a workflow. -func (h *Handler) QueryWorkflow( +func (h *handlerImpl) QueryWorkflow( ctx context.Context, request *hist.QueryWorkflowRequest, ) (resp *hist.QueryWorkflowResponse, retError error) { @@ -1267,7 +1306,7 @@ func (h *Handler) QueryWorkflow( // used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts // child execution without creating the decision task and then calls this API after updating the mutable state of // parent execution. -func (h *Handler) ScheduleDecisionTask( +func (h *handlerImpl) ScheduleDecisionTask( ctx context.Context, request *hist.ScheduleDecisionTaskRequest, ) (retError error) { @@ -1314,7 +1353,7 @@ func (h *Handler) ScheduleDecisionTask( // RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent. // This is mainly called by transfer queue processor during the processing of DeleteExecution task. -func (h *Handler) RecordChildExecutionCompleted( +func (h *handlerImpl) RecordChildExecutionCompleted( ctx context.Context, request *hist.RecordChildExecutionCompletedRequest, ) (retError error) { @@ -1366,7 +1405,7 @@ func (h *Handler) RecordChildExecutionCompleted( // 3. ClientLibraryVersion // 4. ClientFeatureVersion // 5. ClientImpl -func (h *Handler) ResetStickyTaskList( +func (h *handlerImpl) ResetStickyTaskList( ctx context.Context, resetRequest *hist.ResetStickyTaskListRequest, ) (resp *hist.ResetStickyTaskListResponse, retError error) { @@ -1407,7 +1446,7 @@ func (h *Handler) ResetStickyTaskList( } // ReplicateEventsV2 is called by processor to replicate history events for passive domains -func (h *Handler) ReplicateEventsV2( +func (h *handlerImpl) ReplicateEventsV2( ctx context.Context, replicateRequest *hist.ReplicateEventsV2Request, ) (retError error) { @@ -1449,7 +1488,7 @@ func (h *Handler) ReplicateEventsV2( } // SyncShardStatus is called by processor to sync history shard information from another cluster -func (h *Handler) SyncShardStatus( +func (h *handlerImpl) SyncShardStatus( ctx context.Context, syncShardStatusRequest *hist.SyncShardStatusRequest, ) (retError error) { @@ -1497,7 +1536,7 @@ func (h *Handler) SyncShardStatus( } // SyncActivity is called by processor to sync activity -func (h *Handler) SyncActivity( +func (h *handlerImpl) SyncActivity( ctx context.Context, syncActivityRequest *hist.SyncActivityRequest, ) (retError error) { @@ -1546,7 +1585,7 @@ func (h *Handler) SyncActivity( } // GetReplicationMessages is called by remote peers to get replicated messages for cross DC replication -func (h *Handler) GetReplicationMessages( +func (h *handlerImpl) GetReplicationMessages( ctx context.Context, request *r.GetReplicationMessagesRequest, ) (resp *r.GetReplicationMessagesResponse, retError error) { @@ -1607,7 +1646,7 @@ func (h *Handler) GetReplicationMessages( } // GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging -func (h *Handler) GetDLQReplicationMessages( +func (h *handlerImpl) GetDLQReplicationMessages( ctx context.Context, request *r.GetDLQReplicationMessagesRequest, ) (resp *r.GetDLQReplicationMessagesResponse, retError error) { @@ -1684,7 +1723,7 @@ func (h *Handler) GetDLQReplicationMessages( } // ReapplyEvents applies stale events to the current workflow and the current run -func (h *Handler) ReapplyEvents( +func (h *handlerImpl) ReapplyEvents( ctx context.Context, request *hist.ReapplyEventsRequest, ) (retError error) { @@ -1730,7 +1769,7 @@ func (h *Handler) ReapplyEvents( } // ReadDLQMessages reads replication DLQ messages -func (h *Handler) ReadDLQMessages( +func (h *handlerImpl) ReadDLQMessages( ctx context.Context, request *r.ReadDLQMessagesRequest, ) (resp *r.ReadDLQMessagesResponse, retError error) { @@ -1756,7 +1795,7 @@ func (h *Handler) ReadDLQMessages( } // PurgeDLQMessages deletes replication DLQ messages -func (h *Handler) PurgeDLQMessages( +func (h *handlerImpl) PurgeDLQMessages( ctx context.Context, request *r.PurgeDLQMessagesRequest, ) (retError error) { @@ -1782,7 +1821,7 @@ func (h *Handler) PurgeDLQMessages( } // MergeDLQMessages reads and applies replication DLQ messages -func (h *Handler) MergeDLQMessages( +func (h *handlerImpl) MergeDLQMessages( ctx context.Context, request *r.MergeDLQMessagesRequest, ) (resp *r.MergeDLQMessagesResponse, retError error) { @@ -1808,7 +1847,7 @@ func (h *Handler) MergeDLQMessages( } // RefreshWorkflowTasks refreshes all the tasks of a workflow -func (h *Handler) RefreshWorkflowTasks( +func (h *handlerImpl) RefreshWorkflowTasks( ctx context.Context, request *hist.RefreshWorkflowTasksRequest) (retError error) { @@ -1847,7 +1886,7 @@ func (h *Handler) RefreshWorkflowTasks( // NotifyFailoverMarkers sends the failover markers to failover coordinator. // The coordinator decides when the failover finishes based on received failover marker. -func (h *Handler) NotifyFailoverMarkers( +func (h *handlerImpl) NotifyFailoverMarkers( ctx context.Context, request *hist.NotifyFailoverMarkersRequest, ) (retError error) { @@ -1868,7 +1907,7 @@ func (h *Handler) NotifyFailoverMarkers( // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various // HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the // correct shard. -func (h *Handler) convertError(err error) error { +func (h *handlerImpl) convertError(err error) error { switch err.(type) { case *persistence.ShardOwnershipLostError: shardID := err.(*persistence.ShardOwnershipLostError).ShardID @@ -1891,7 +1930,7 @@ func (h *Handler) convertError(err error) error { return err } -func (h *Handler) updateErrorMetric( +func (h *handlerImpl) updateErrorMetric( scope int, domainID string, workflowID string, @@ -1941,7 +1980,7 @@ func (h *Handler) updateErrorMetric( } } -func (h *Handler) error( +func (h *handlerImpl) error( err error, scope int, domainID string, @@ -1954,7 +1993,7 @@ func (h *Handler) error( return err } -func (h *Handler) getLoggerWithTags( +func (h *handlerImpl) getLoggerWithTags( domainID string, workflowID string, ) log.Logger { diff --git a/service/history/handler_mock.go b/service/history/handler_mock.go new file mode 100644 index 00000000000..4f98b2e34a0 --- /dev/null +++ b/service/history/handler_mock.go @@ -0,0 +1,643 @@ +// The MIT License (MIT) +// +// Copyright (c) 2017-2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/uber/cadence/service/history (interfaces: Handler) + +// Package history is a generated GoMock package. +package history + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + health "github.com/uber/cadence/.gen/go/health" + history "github.com/uber/cadence/.gen/go/history" + replicator "github.com/uber/cadence/.gen/go/replicator" + shared "github.com/uber/cadence/.gen/go/shared" +) + +// MockHandler is a mock of Handler interface +type MockHandler struct { + ctrl *gomock.Controller + recorder *MockHandlerMockRecorder +} + +// MockHandlerMockRecorder is the mock recorder for MockHandler +type MockHandlerMockRecorder struct { + mock *MockHandler +} + +// NewMockHandler creates a new mock instance +func NewMockHandler(ctrl *gomock.Controller) *MockHandler { + mock := &MockHandler{ctrl: ctrl} + mock.recorder = &MockHandlerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockHandler) EXPECT() *MockHandlerMockRecorder { + return m.recorder +} + +// CloseShard mocks base method +func (m *MockHandler) CloseShard(arg0 context.Context, arg1 *shared.CloseShardRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseShard", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseShard indicates an expected call of CloseShard +func (mr *MockHandlerMockRecorder) CloseShard(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseShard", reflect.TypeOf((*MockHandler)(nil).CloseShard), arg0, arg1) +} + +// DescribeHistoryHost mocks base method +func (m *MockHandler) DescribeHistoryHost(arg0 context.Context, arg1 *shared.DescribeHistoryHostRequest) (*shared.DescribeHistoryHostResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeHistoryHost", arg0, arg1) + ret0, _ := ret[0].(*shared.DescribeHistoryHostResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeHistoryHost indicates an expected call of DescribeHistoryHost +func (mr *MockHandlerMockRecorder) DescribeHistoryHost(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeHistoryHost", reflect.TypeOf((*MockHandler)(nil).DescribeHistoryHost), arg0, arg1) +} + +// DescribeMutableState mocks base method +func (m *MockHandler) DescribeMutableState(arg0 context.Context, arg1 *history.DescribeMutableStateRequest) (*history.DescribeMutableStateResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeMutableState", arg0, arg1) + ret0, _ := ret[0].(*history.DescribeMutableStateResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeMutableState indicates an expected call of DescribeMutableState +func (mr *MockHandlerMockRecorder) DescribeMutableState(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeMutableState", reflect.TypeOf((*MockHandler)(nil).DescribeMutableState), arg0, arg1) +} + +// DescribeQueue mocks base method +func (m *MockHandler) DescribeQueue(arg0 context.Context, arg1 *shared.DescribeQueueRequest) (*shared.DescribeQueueResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeQueue", arg0, arg1) + ret0, _ := ret[0].(*shared.DescribeQueueResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeQueue indicates an expected call of DescribeQueue +func (mr *MockHandlerMockRecorder) DescribeQueue(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeQueue", reflect.TypeOf((*MockHandler)(nil).DescribeQueue), arg0, arg1) +} + +// DescribeWorkflowExecution mocks base method +func (m *MockHandler) DescribeWorkflowExecution(arg0 context.Context, arg1 *history.DescribeWorkflowExecutionRequest) (*shared.DescribeWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DescribeWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(*shared.DescribeWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeWorkflowExecution indicates an expected call of DescribeWorkflowExecution +func (mr *MockHandlerMockRecorder) DescribeWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).DescribeWorkflowExecution), arg0, arg1) +} + +// GetDLQReplicationMessages mocks base method +func (m *MockHandler) GetDLQReplicationMessages(arg0 context.Context, arg1 *replicator.GetDLQReplicationMessagesRequest) (*replicator.GetDLQReplicationMessagesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetDLQReplicationMessages", arg0, arg1) + ret0, _ := ret[0].(*replicator.GetDLQReplicationMessagesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetDLQReplicationMessages indicates an expected call of GetDLQReplicationMessages +func (mr *MockHandlerMockRecorder) GetDLQReplicationMessages(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDLQReplicationMessages", reflect.TypeOf((*MockHandler)(nil).GetDLQReplicationMessages), arg0, arg1) +} + +// GetMutableState mocks base method +func (m *MockHandler) GetMutableState(arg0 context.Context, arg1 *history.GetMutableStateRequest) (*history.GetMutableStateResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMutableState", arg0, arg1) + ret0, _ := ret[0].(*history.GetMutableStateResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMutableState indicates an expected call of GetMutableState +func (mr *MockHandlerMockRecorder) GetMutableState(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMutableState", reflect.TypeOf((*MockHandler)(nil).GetMutableState), arg0, arg1) +} + +// GetReplicationMessages mocks base method +func (m *MockHandler) GetReplicationMessages(arg0 context.Context, arg1 *replicator.GetReplicationMessagesRequest) (*replicator.GetReplicationMessagesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReplicationMessages", arg0, arg1) + ret0, _ := ret[0].(*replicator.GetReplicationMessagesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetReplicationMessages indicates an expected call of GetReplicationMessages +func (mr *MockHandlerMockRecorder) GetReplicationMessages(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplicationMessages", reflect.TypeOf((*MockHandler)(nil).GetReplicationMessages), arg0, arg1) +} + +// Health mocks base method +func (m *MockHandler) Health(arg0 context.Context) (*health.HealthStatus, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Health", arg0) + ret0, _ := ret[0].(*health.HealthStatus) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Health indicates an expected call of Health +func (mr *MockHandlerMockRecorder) Health(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Health", reflect.TypeOf((*MockHandler)(nil).Health), arg0) +} + +// MergeDLQMessages mocks base method +func (m *MockHandler) MergeDLQMessages(arg0 context.Context, arg1 *replicator.MergeDLQMessagesRequest) (*replicator.MergeDLQMessagesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MergeDLQMessages", arg0, arg1) + ret0, _ := ret[0].(*replicator.MergeDLQMessagesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MergeDLQMessages indicates an expected call of MergeDLQMessages +func (mr *MockHandlerMockRecorder) MergeDLQMessages(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MergeDLQMessages", reflect.TypeOf((*MockHandler)(nil).MergeDLQMessages), arg0, arg1) +} + +// NotifyFailoverMarkers mocks base method +func (m *MockHandler) NotifyFailoverMarkers(arg0 context.Context, arg1 *history.NotifyFailoverMarkersRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NotifyFailoverMarkers", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// NotifyFailoverMarkers indicates an expected call of NotifyFailoverMarkers +func (mr *MockHandlerMockRecorder) NotifyFailoverMarkers(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NotifyFailoverMarkers", reflect.TypeOf((*MockHandler)(nil).NotifyFailoverMarkers), arg0, arg1) +} + +// PollMutableState mocks base method +func (m *MockHandler) PollMutableState(arg0 context.Context, arg1 *history.PollMutableStateRequest) (*history.PollMutableStateResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PollMutableState", arg0, arg1) + ret0, _ := ret[0].(*history.PollMutableStateResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PollMutableState indicates an expected call of PollMutableState +func (mr *MockHandlerMockRecorder) PollMutableState(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollMutableState", reflect.TypeOf((*MockHandler)(nil).PollMutableState), arg0, arg1) +} + +// PurgeDLQMessages mocks base method +func (m *MockHandler) PurgeDLQMessages(arg0 context.Context, arg1 *replicator.PurgeDLQMessagesRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PurgeDLQMessages", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// PurgeDLQMessages indicates an expected call of PurgeDLQMessages +func (mr *MockHandlerMockRecorder) PurgeDLQMessages(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PurgeDLQMessages", reflect.TypeOf((*MockHandler)(nil).PurgeDLQMessages), arg0, arg1) +} + +// QueryWorkflow mocks base method +func (m *MockHandler) QueryWorkflow(arg0 context.Context, arg1 *history.QueryWorkflowRequest) (*history.QueryWorkflowResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryWorkflow", arg0, arg1) + ret0, _ := ret[0].(*history.QueryWorkflowResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QueryWorkflow indicates an expected call of QueryWorkflow +func (mr *MockHandlerMockRecorder) QueryWorkflow(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryWorkflow", reflect.TypeOf((*MockHandler)(nil).QueryWorkflow), arg0, arg1) +} + +// ReadDLQMessages mocks base method +func (m *MockHandler) ReadDLQMessages(arg0 context.Context, arg1 *replicator.ReadDLQMessagesRequest) (*replicator.ReadDLQMessagesResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadDLQMessages", arg0, arg1) + ret0, _ := ret[0].(*replicator.ReadDLQMessagesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadDLQMessages indicates an expected call of ReadDLQMessages +func (mr *MockHandlerMockRecorder) ReadDLQMessages(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadDLQMessages", reflect.TypeOf((*MockHandler)(nil).ReadDLQMessages), arg0, arg1) +} + +// ReapplyEvents mocks base method +func (m *MockHandler) ReapplyEvents(arg0 context.Context, arg1 *history.ReapplyEventsRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReapplyEvents", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReapplyEvents indicates an expected call of ReapplyEvents +func (mr *MockHandlerMockRecorder) ReapplyEvents(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReapplyEvents", reflect.TypeOf((*MockHandler)(nil).ReapplyEvents), arg0, arg1) +} + +// RecordActivityTaskHeartbeat mocks base method +func (m *MockHandler) RecordActivityTaskHeartbeat(arg0 context.Context, arg1 *history.RecordActivityTaskHeartbeatRequest) (*shared.RecordActivityTaskHeartbeatResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecordActivityTaskHeartbeat", arg0, arg1) + ret0, _ := ret[0].(*shared.RecordActivityTaskHeartbeatResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RecordActivityTaskHeartbeat indicates an expected call of RecordActivityTaskHeartbeat +func (mr *MockHandlerMockRecorder) RecordActivityTaskHeartbeat(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordActivityTaskHeartbeat", reflect.TypeOf((*MockHandler)(nil).RecordActivityTaskHeartbeat), arg0, arg1) +} + +// RecordActivityTaskStarted mocks base method +func (m *MockHandler) RecordActivityTaskStarted(arg0 context.Context, arg1 *history.RecordActivityTaskStartedRequest) (*history.RecordActivityTaskStartedResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecordActivityTaskStarted", arg0, arg1) + ret0, _ := ret[0].(*history.RecordActivityTaskStartedResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RecordActivityTaskStarted indicates an expected call of RecordActivityTaskStarted +func (mr *MockHandlerMockRecorder) RecordActivityTaskStarted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordActivityTaskStarted", reflect.TypeOf((*MockHandler)(nil).RecordActivityTaskStarted), arg0, arg1) +} + +// RecordChildExecutionCompleted mocks base method +func (m *MockHandler) RecordChildExecutionCompleted(arg0 context.Context, arg1 *history.RecordChildExecutionCompletedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecordChildExecutionCompleted", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecordChildExecutionCompleted indicates an expected call of RecordChildExecutionCompleted +func (mr *MockHandlerMockRecorder) RecordChildExecutionCompleted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordChildExecutionCompleted", reflect.TypeOf((*MockHandler)(nil).RecordChildExecutionCompleted), arg0, arg1) +} + +// RecordDecisionTaskStarted mocks base method +func (m *MockHandler) RecordDecisionTaskStarted(arg0 context.Context, arg1 *history.RecordDecisionTaskStartedRequest) (*history.RecordDecisionTaskStartedResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecordDecisionTaskStarted", arg0, arg1) + ret0, _ := ret[0].(*history.RecordDecisionTaskStartedResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RecordDecisionTaskStarted indicates an expected call of RecordDecisionTaskStarted +func (mr *MockHandlerMockRecorder) RecordDecisionTaskStarted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordDecisionTaskStarted", reflect.TypeOf((*MockHandler)(nil).RecordDecisionTaskStarted), arg0, arg1) +} + +// RefreshWorkflowTasks mocks base method +func (m *MockHandler) RefreshWorkflowTasks(arg0 context.Context, arg1 *history.RefreshWorkflowTasksRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RefreshWorkflowTasks", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RefreshWorkflowTasks indicates an expected call of RefreshWorkflowTasks +func (mr *MockHandlerMockRecorder) RefreshWorkflowTasks(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshWorkflowTasks", reflect.TypeOf((*MockHandler)(nil).RefreshWorkflowTasks), arg0, arg1) +} + +// RemoveSignalMutableState mocks base method +func (m *MockHandler) RemoveSignalMutableState(arg0 context.Context, arg1 *history.RemoveSignalMutableStateRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveSignalMutableState", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveSignalMutableState indicates an expected call of RemoveSignalMutableState +func (mr *MockHandlerMockRecorder) RemoveSignalMutableState(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveSignalMutableState", reflect.TypeOf((*MockHandler)(nil).RemoveSignalMutableState), arg0, arg1) +} + +// RemoveTask mocks base method +func (m *MockHandler) RemoveTask(arg0 context.Context, arg1 *shared.RemoveTaskRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveTask", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveTask indicates an expected call of RemoveTask +func (mr *MockHandlerMockRecorder) RemoveTask(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveTask", reflect.TypeOf((*MockHandler)(nil).RemoveTask), arg0, arg1) +} + +// ReplicateEventsV2 mocks base method +func (m *MockHandler) ReplicateEventsV2(arg0 context.Context, arg1 *history.ReplicateEventsV2Request) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReplicateEventsV2", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReplicateEventsV2 indicates an expected call of ReplicateEventsV2 +func (mr *MockHandlerMockRecorder) ReplicateEventsV2(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReplicateEventsV2", reflect.TypeOf((*MockHandler)(nil).ReplicateEventsV2), arg0, arg1) +} + +// RequestCancelWorkflowExecution mocks base method +func (m *MockHandler) RequestCancelWorkflowExecution(arg0 context.Context, arg1 *history.RequestCancelWorkflowExecutionRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestCancelWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequestCancelWorkflowExecution indicates an expected call of RequestCancelWorkflowExecution +func (mr *MockHandlerMockRecorder) RequestCancelWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCancelWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).RequestCancelWorkflowExecution), arg0, arg1) +} + +// ResetQueue mocks base method +func (m *MockHandler) ResetQueue(arg0 context.Context, arg1 *shared.ResetQueueRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ResetQueue", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ResetQueue indicates an expected call of ResetQueue +func (mr *MockHandlerMockRecorder) ResetQueue(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetQueue", reflect.TypeOf((*MockHandler)(nil).ResetQueue), arg0, arg1) +} + +// ResetStickyTaskList mocks base method +func (m *MockHandler) ResetStickyTaskList(arg0 context.Context, arg1 *history.ResetStickyTaskListRequest) (*history.ResetStickyTaskListResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ResetStickyTaskList", arg0, arg1) + ret0, _ := ret[0].(*history.ResetStickyTaskListResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ResetStickyTaskList indicates an expected call of ResetStickyTaskList +func (mr *MockHandlerMockRecorder) ResetStickyTaskList(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetStickyTaskList", reflect.TypeOf((*MockHandler)(nil).ResetStickyTaskList), arg0, arg1) +} + +// ResetWorkflowExecution mocks base method +func (m *MockHandler) ResetWorkflowExecution(arg0 context.Context, arg1 *history.ResetWorkflowExecutionRequest) (*shared.ResetWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ResetWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(*shared.ResetWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ResetWorkflowExecution indicates an expected call of ResetWorkflowExecution +func (mr *MockHandlerMockRecorder) ResetWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).ResetWorkflowExecution), arg0, arg1) +} + +// RespondActivityTaskCanceled mocks base method +func (m *MockHandler) RespondActivityTaskCanceled(arg0 context.Context, arg1 *history.RespondActivityTaskCanceledRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RespondActivityTaskCanceled", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RespondActivityTaskCanceled indicates an expected call of RespondActivityTaskCanceled +func (mr *MockHandlerMockRecorder) RespondActivityTaskCanceled(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondActivityTaskCanceled", reflect.TypeOf((*MockHandler)(nil).RespondActivityTaskCanceled), arg0, arg1) +} + +// RespondActivityTaskCompleted mocks base method +func (m *MockHandler) RespondActivityTaskCompleted(arg0 context.Context, arg1 *history.RespondActivityTaskCompletedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RespondActivityTaskCompleted", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RespondActivityTaskCompleted indicates an expected call of RespondActivityTaskCompleted +func (mr *MockHandlerMockRecorder) RespondActivityTaskCompleted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondActivityTaskCompleted", reflect.TypeOf((*MockHandler)(nil).RespondActivityTaskCompleted), arg0, arg1) +} + +// RespondActivityTaskFailed mocks base method +func (m *MockHandler) RespondActivityTaskFailed(arg0 context.Context, arg1 *history.RespondActivityTaskFailedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RespondActivityTaskFailed", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RespondActivityTaskFailed indicates an expected call of RespondActivityTaskFailed +func (mr *MockHandlerMockRecorder) RespondActivityTaskFailed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondActivityTaskFailed", reflect.TypeOf((*MockHandler)(nil).RespondActivityTaskFailed), arg0, arg1) +} + +// RespondDecisionTaskCompleted mocks base method +func (m *MockHandler) RespondDecisionTaskCompleted(arg0 context.Context, arg1 *history.RespondDecisionTaskCompletedRequest) (*history.RespondDecisionTaskCompletedResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RespondDecisionTaskCompleted", arg0, arg1) + ret0, _ := ret[0].(*history.RespondDecisionTaskCompletedResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RespondDecisionTaskCompleted indicates an expected call of RespondDecisionTaskCompleted +func (mr *MockHandlerMockRecorder) RespondDecisionTaskCompleted(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondDecisionTaskCompleted", reflect.TypeOf((*MockHandler)(nil).RespondDecisionTaskCompleted), arg0, arg1) +} + +// RespondDecisionTaskFailed mocks base method +func (m *MockHandler) RespondDecisionTaskFailed(arg0 context.Context, arg1 *history.RespondDecisionTaskFailedRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RespondDecisionTaskFailed", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// RespondDecisionTaskFailed indicates an expected call of RespondDecisionTaskFailed +func (mr *MockHandlerMockRecorder) RespondDecisionTaskFailed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RespondDecisionTaskFailed", reflect.TypeOf((*MockHandler)(nil).RespondDecisionTaskFailed), arg0, arg1) +} + +// ScheduleDecisionTask mocks base method +func (m *MockHandler) ScheduleDecisionTask(arg0 context.Context, arg1 *history.ScheduleDecisionTaskRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ScheduleDecisionTask", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ScheduleDecisionTask indicates an expected call of ScheduleDecisionTask +func (mr *MockHandlerMockRecorder) ScheduleDecisionTask(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScheduleDecisionTask", reflect.TypeOf((*MockHandler)(nil).ScheduleDecisionTask), arg0, arg1) +} + +// SignalWithStartWorkflowExecution mocks base method +func (m *MockHandler) SignalWithStartWorkflowExecution(arg0 context.Context, arg1 *history.SignalWithStartWorkflowExecutionRequest) (*shared.StartWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SignalWithStartWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(*shared.StartWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SignalWithStartWorkflowExecution indicates an expected call of SignalWithStartWorkflowExecution +func (mr *MockHandlerMockRecorder) SignalWithStartWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignalWithStartWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).SignalWithStartWorkflowExecution), arg0, arg1) +} + +// SignalWorkflowExecution mocks base method +func (m *MockHandler) SignalWorkflowExecution(arg0 context.Context, arg1 *history.SignalWorkflowExecutionRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SignalWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SignalWorkflowExecution indicates an expected call of SignalWorkflowExecution +func (mr *MockHandlerMockRecorder) SignalWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignalWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).SignalWorkflowExecution), arg0, arg1) +} + +// StartWorkflowExecution mocks base method +func (m *MockHandler) StartWorkflowExecution(arg0 context.Context, arg1 *history.StartWorkflowExecutionRequest) (*shared.StartWorkflowExecutionResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StartWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(*shared.StartWorkflowExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartWorkflowExecution indicates an expected call of StartWorkflowExecution +func (mr *MockHandlerMockRecorder) StartWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).StartWorkflowExecution), arg0, arg1) +} + +// SyncActivity mocks base method +func (m *MockHandler) SyncActivity(arg0 context.Context, arg1 *history.SyncActivityRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncActivity", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncActivity indicates an expected call of SyncActivity +func (mr *MockHandlerMockRecorder) SyncActivity(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncActivity", reflect.TypeOf((*MockHandler)(nil).SyncActivity), arg0, arg1) +} + +// SyncShardStatus mocks base method +func (m *MockHandler) SyncShardStatus(arg0 context.Context, arg1 *history.SyncShardStatusRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncShardStatus", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncShardStatus indicates an expected call of SyncShardStatus +func (mr *MockHandlerMockRecorder) SyncShardStatus(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncShardStatus", reflect.TypeOf((*MockHandler)(nil).SyncShardStatus), arg0, arg1) +} + +// TerminateWorkflowExecution mocks base method +func (m *MockHandler) TerminateWorkflowExecution(arg0 context.Context, arg1 *history.TerminateWorkflowExecutionRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TerminateWorkflowExecution", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// TerminateWorkflowExecution indicates an expected call of TerminateWorkflowExecution +func (mr *MockHandlerMockRecorder) TerminateWorkflowExecution(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TerminateWorkflowExecution", reflect.TypeOf((*MockHandler)(nil).TerminateWorkflowExecution), arg0, arg1) +} diff --git a/service/history/service.go b/service/history/service.go index a946c23e3b5..afe62ff9ce0 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -42,7 +42,7 @@ type Service struct { resource.Resource status int32 - handler *Handler + handler *handlerImpl stopC chan struct{} params *service.BootstrapParams config *config.Config @@ -123,7 +123,9 @@ func (s *Service) Start() { logger.Info("history starting") s.handler = NewHandler(s.Resource, s.config) - s.handler.RegisterHandler() + + thriftHandler := NewThriftHandler(s.handler) + thriftHandler.register(s.GetDispatcher()) // must start resource first s.Resource.Start() diff --git a/service/history/thriftHandler.go b/service/history/thriftHandler.go new file mode 100644 index 00000000000..c96214219a6 --- /dev/null +++ b/service/history/thriftHandler.go @@ -0,0 +1,249 @@ +// Copyright (c) 2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/health" + "github.com/uber/cadence/.gen/go/health/metaserver" + h "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/history/historyserviceserver" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" +) + +// ThriftHandler wrap underlying handler and handles Thrift related type conversions +type ThriftHandler struct { + h Handler +} + +// NewThriftHandler creates Thrift handler on top of underlying handler +func NewThriftHandler(h Handler) ThriftHandler { + return ThriftHandler{h} +} + +func (t ThriftHandler) register(dispatcher *yarpc.Dispatcher) { + dispatcher.Register(historyserviceserver.New(&t)) + dispatcher.Register(metaserver.New(&t)) +} + +// Health forwards request to the underlying handler +func (t ThriftHandler) Health(ctx context.Context) (response *health.HealthStatus, err error) { + return t.h.Health(ctx) +} + +// CloseShard forwards request to the underlying handler +func (t ThriftHandler) CloseShard(ctx context.Context, request *shared.CloseShardRequest) (err error) { + return t.h.CloseShard(ctx, request) +} + +// DescribeHistoryHost forwards request to the underlying handler +func (t ThriftHandler) DescribeHistoryHost(ctx context.Context, request *shared.DescribeHistoryHostRequest) (response *shared.DescribeHistoryHostResponse, err error) { + return t.h.DescribeHistoryHost(ctx, request) +} + +// DescribeMutableState forwards request to the underlying handler +func (t ThriftHandler) DescribeMutableState(ctx context.Context, request *h.DescribeMutableStateRequest) (response *h.DescribeMutableStateResponse, err error) { + return t.h.DescribeMutableState(ctx, request) +} + +// DescribeQueue forwards request to the underlying handler +func (t ThriftHandler) DescribeQueue(ctx context.Context, request *shared.DescribeQueueRequest) (response *shared.DescribeQueueResponse, err error) { + return t.h.DescribeQueue(ctx, request) +} + +// DescribeWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) DescribeWorkflowExecution(ctx context.Context, request *h.DescribeWorkflowExecutionRequest) (response *shared.DescribeWorkflowExecutionResponse, err error) { + return t.h.DescribeWorkflowExecution(ctx, request) +} + +// GetDLQReplicationMessages forwards request to the underlying handler +func (t ThriftHandler) GetDLQReplicationMessages(ctx context.Context, request *replicator.GetDLQReplicationMessagesRequest) (response *replicator.GetDLQReplicationMessagesResponse, err error) { + return t.h.GetDLQReplicationMessages(ctx, request) +} + +// GetMutableState forwards request to the underlying handler +func (t ThriftHandler) GetMutableState(ctx context.Context, request *h.GetMutableStateRequest) (response *h.GetMutableStateResponse, err error) { + return t.h.GetMutableState(ctx, request) +} + +// GetReplicationMessages forwards request to the underlying handler +func (t ThriftHandler) GetReplicationMessages(ctx context.Context, request *replicator.GetReplicationMessagesRequest) (response *replicator.GetReplicationMessagesResponse, err error) { + return t.h.GetReplicationMessages(ctx, request) +} + +// MergeDLQMessages forwards request to the underlying handler +func (t ThriftHandler) MergeDLQMessages(ctx context.Context, request *replicator.MergeDLQMessagesRequest) (response *replicator.MergeDLQMessagesResponse, err error) { + return t.h.MergeDLQMessages(ctx, request) +} + +// NotifyFailoverMarkers forwards request to the underlying handler +func (t ThriftHandler) NotifyFailoverMarkers(ctx context.Context, request *h.NotifyFailoverMarkersRequest) (err error) { + return t.h.NotifyFailoverMarkers(ctx, request) +} + +// PollMutableState forwards request to the underlying handler +func (t ThriftHandler) PollMutableState(ctx context.Context, request *h.PollMutableStateRequest) (response *h.PollMutableStateResponse, err error) { + return t.h.PollMutableState(ctx, request) +} + +// PurgeDLQMessages forwards request to the underlying handler +func (t ThriftHandler) PurgeDLQMessages(ctx context.Context, request *replicator.PurgeDLQMessagesRequest) (err error) { + return t.h.PurgeDLQMessages(ctx, request) +} + +// QueryWorkflow forwards request to the underlying handler +func (t ThriftHandler) QueryWorkflow(ctx context.Context, request *h.QueryWorkflowRequest) (response *h.QueryWorkflowResponse, err error) { + return t.h.QueryWorkflow(ctx, request) +} + +// ReadDLQMessages forwards request to the underlying handler +func (t ThriftHandler) ReadDLQMessages(ctx context.Context, request *replicator.ReadDLQMessagesRequest) (response *replicator.ReadDLQMessagesResponse, err error) { + return t.h.ReadDLQMessages(ctx, request) +} + +// ReapplyEvents forwards request to the underlying handler +func (t ThriftHandler) ReapplyEvents(ctx context.Context, request *h.ReapplyEventsRequest) (err error) { + return t.h.ReapplyEvents(ctx, request) +} + +// RecordActivityTaskHeartbeat forwards request to the underlying handler +func (t ThriftHandler) RecordActivityTaskHeartbeat(ctx context.Context, request *h.RecordActivityTaskHeartbeatRequest) (response *shared.RecordActivityTaskHeartbeatResponse, err error) { + return t.h.RecordActivityTaskHeartbeat(ctx, request) +} + +// RecordActivityTaskStarted forwards request to the underlying handler +func (t ThriftHandler) RecordActivityTaskStarted(ctx context.Context, request *h.RecordActivityTaskStartedRequest) (response *h.RecordActivityTaskStartedResponse, err error) { + return t.h.RecordActivityTaskStarted(ctx, request) +} + +// RecordChildExecutionCompleted forwards request to the underlying handler +func (t ThriftHandler) RecordChildExecutionCompleted(ctx context.Context, request *h.RecordChildExecutionCompletedRequest) (err error) { + return t.h.RecordChildExecutionCompleted(ctx, request) +} + +// RecordDecisionTaskStarted forwards request to the underlying handler +func (t ThriftHandler) RecordDecisionTaskStarted(ctx context.Context, request *h.RecordDecisionTaskStartedRequest) (response *h.RecordDecisionTaskStartedResponse, err error) { + return t.h.RecordDecisionTaskStarted(ctx, request) +} + +// RefreshWorkflowTasks forwards request to the underlying handler +func (t ThriftHandler) RefreshWorkflowTasks(ctx context.Context, request *h.RefreshWorkflowTasksRequest) (err error) { + return t.h.RefreshWorkflowTasks(ctx, request) +} + +// RemoveSignalMutableState forwards request to the underlying handler +func (t ThriftHandler) RemoveSignalMutableState(ctx context.Context, request *h.RemoveSignalMutableStateRequest) (err error) { + return t.h.RemoveSignalMutableState(ctx, request) +} + +// RemoveTask forwards request to the underlying handler +func (t ThriftHandler) RemoveTask(ctx context.Context, request *shared.RemoveTaskRequest) (err error) { + return t.h.RemoveTask(ctx, request) +} + +// ReplicateEventsV2 forwards request to the underlying handler +func (t ThriftHandler) ReplicateEventsV2(ctx context.Context, request *h.ReplicateEventsV2Request) (err error) { + return t.h.ReplicateEventsV2(ctx, request) +} + +// RequestCancelWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) RequestCancelWorkflowExecution(ctx context.Context, request *h.RequestCancelWorkflowExecutionRequest) (err error) { + return t.h.RequestCancelWorkflowExecution(ctx, request) +} + +// ResetQueue forwards request to the underlying handler +func (t ThriftHandler) ResetQueue(ctx context.Context, request *shared.ResetQueueRequest) (err error) { + return t.h.ResetQueue(ctx, request) +} + +// ResetStickyTaskList forwards request to the underlying handler +func (t ThriftHandler) ResetStickyTaskList(ctx context.Context, request *h.ResetStickyTaskListRequest) (response *h.ResetStickyTaskListResponse, err error) { + return t.h.ResetStickyTaskList(ctx, request) +} + +// ResetWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) ResetWorkflowExecution(ctx context.Context, request *h.ResetWorkflowExecutionRequest) (response *shared.ResetWorkflowExecutionResponse, err error) { + return t.h.ResetWorkflowExecution(ctx, request) +} + +// RespondActivityTaskCanceled forwards request to the underlying handler +func (t ThriftHandler) RespondActivityTaskCanceled(ctx context.Context, request *h.RespondActivityTaskCanceledRequest) (err error) { + return t.h.RespondActivityTaskCanceled(ctx, request) +} + +// RespondActivityTaskCompleted forwards request to the underlying handler +func (t ThriftHandler) RespondActivityTaskCompleted(ctx context.Context, request *h.RespondActivityTaskCompletedRequest) (err error) { + return t.h.RespondActivityTaskCompleted(ctx, request) +} + +// RespondActivityTaskFailed forwards request to the underlying handler +func (t ThriftHandler) RespondActivityTaskFailed(ctx context.Context, request *h.RespondActivityTaskFailedRequest) (err error) { + return t.h.RespondActivityTaskFailed(ctx, request) +} + +// RespondDecisionTaskCompleted forwards request to the underlying handler +func (t ThriftHandler) RespondDecisionTaskCompleted(ctx context.Context, request *h.RespondDecisionTaskCompletedRequest) (response *h.RespondDecisionTaskCompletedResponse, err error) { + return t.h.RespondDecisionTaskCompleted(ctx, request) +} + +// RespondDecisionTaskFailed forwards request to the underlying handler +func (t ThriftHandler) RespondDecisionTaskFailed(ctx context.Context, request *h.RespondDecisionTaskFailedRequest) (err error) { + return t.h.RespondDecisionTaskFailed(ctx, request) +} + +// ScheduleDecisionTask forwards request to the underlying handler +func (t ThriftHandler) ScheduleDecisionTask(ctx context.Context, request *h.ScheduleDecisionTaskRequest) (err error) { + return t.h.ScheduleDecisionTask(ctx, request) +} + +// SignalWithStartWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) SignalWithStartWorkflowExecution(ctx context.Context, request *h.SignalWithStartWorkflowExecutionRequest) (response *shared.StartWorkflowExecutionResponse, err error) { + return t.h.SignalWithStartWorkflowExecution(ctx, request) +} + +// SignalWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) SignalWorkflowExecution(ctx context.Context, request *h.SignalWorkflowExecutionRequest) (err error) { + return t.h.SignalWorkflowExecution(ctx, request) +} + +// StartWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) StartWorkflowExecution(ctx context.Context, request *h.StartWorkflowExecutionRequest) (response *shared.StartWorkflowExecutionResponse, err error) { + return t.h.StartWorkflowExecution(ctx, request) +} + +// SyncActivity forwards request to the underlying handler +func (t ThriftHandler) SyncActivity(ctx context.Context, request *h.SyncActivityRequest) (err error) { + return t.h.SyncActivity(ctx, request) +} + +// SyncShardStatus forwards request to the underlying handler +func (t ThriftHandler) SyncShardStatus(ctx context.Context, request *h.SyncShardStatusRequest) (err error) { + return t.h.SyncShardStatus(ctx, request) +} + +// TerminateWorkflowExecution forwards request to the underlying handler +func (t ThriftHandler) TerminateWorkflowExecution(ctx context.Context, request *h.TerminateWorkflowExecutionRequest) (err error) { + return t.h.TerminateWorkflowExecution(ctx, request) +} diff --git a/service/history/thriftHandler_test.go b/service/history/thriftHandler_test.go new file mode 100644 index 00000000000..781411e5e89 --- /dev/null +++ b/service/history/thriftHandler_test.go @@ -0,0 +1,264 @@ +// Copyright (c) 2020 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "testing" + + "github.com/uber/cadence/.gen/go/health" + hist "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +func TestThriftHandler(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + h := NewMockHandler(ctrl) + th := NewThriftHandler(h) + ctx := context.Background() + + t.Run("Health", func(t *testing.T) { + h.EXPECT().Health(ctx).Return(&health.HealthStatus{}, assert.AnError).Times(1) + resp, err := th.Health(ctx) + assert.Equal(t, health.HealthStatus{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("CloseShard", func(t *testing.T) { + h.EXPECT().CloseShard(ctx, &shared.CloseShardRequest{}).Return(assert.AnError).Times(1) + err := th.CloseShard(ctx, &shared.CloseShardRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("DescribeHistoryHost", func(t *testing.T) { + h.EXPECT().DescribeHistoryHost(ctx, &shared.DescribeHistoryHostRequest{}).Return(&shared.DescribeHistoryHostResponse{}, assert.AnError).Times(1) + resp, err := th.DescribeHistoryHost(ctx, &shared.DescribeHistoryHostRequest{}) + assert.Equal(t, shared.DescribeHistoryHostResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("DescribeMutableState", func(t *testing.T) { + h.EXPECT().DescribeMutableState(ctx, &hist.DescribeMutableStateRequest{}).Return(&hist.DescribeMutableStateResponse{}, assert.AnError).Times(1) + resp, err := th.DescribeMutableState(ctx, &hist.DescribeMutableStateRequest{}) + assert.Equal(t, hist.DescribeMutableStateResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("DescribeQueue", func(t *testing.T) { + h.EXPECT().DescribeQueue(ctx, &shared.DescribeQueueRequest{}).Return(&shared.DescribeQueueResponse{}, assert.AnError).Times(1) + resp, err := th.DescribeQueue(ctx, &shared.DescribeQueueRequest{}) + assert.Equal(t, shared.DescribeQueueResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("DescribeWorkflowExecution", func(t *testing.T) { + h.EXPECT().DescribeWorkflowExecution(ctx, &hist.DescribeWorkflowExecutionRequest{}).Return(&shared.DescribeWorkflowExecutionResponse{}, assert.AnError).Times(1) + resp, err := th.DescribeWorkflowExecution(ctx, &hist.DescribeWorkflowExecutionRequest{}) + assert.Equal(t, shared.DescribeWorkflowExecutionResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("GetDLQReplicationMessages", func(t *testing.T) { + h.EXPECT().GetDLQReplicationMessages(ctx, &replicator.GetDLQReplicationMessagesRequest{}).Return(&replicator.GetDLQReplicationMessagesResponse{}, assert.AnError).Times(1) + resp, err := th.GetDLQReplicationMessages(ctx, &replicator.GetDLQReplicationMessagesRequest{}) + assert.Equal(t, replicator.GetDLQReplicationMessagesResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("GetMutableState", func(t *testing.T) { + h.EXPECT().GetMutableState(ctx, &hist.GetMutableStateRequest{}).Return(&hist.GetMutableStateResponse{}, assert.AnError).Times(1) + resp, err := th.GetMutableState(ctx, &hist.GetMutableStateRequest{}) + assert.Equal(t, hist.GetMutableStateResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("GetReplicationMessages", func(t *testing.T) { + h.EXPECT().GetReplicationMessages(ctx, &replicator.GetReplicationMessagesRequest{}).Return(&replicator.GetReplicationMessagesResponse{}, assert.AnError).Times(1) + resp, err := th.GetReplicationMessages(ctx, &replicator.GetReplicationMessagesRequest{}) + assert.Equal(t, replicator.GetReplicationMessagesResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("MergeDLQMessages", func(t *testing.T) { + h.EXPECT().MergeDLQMessages(ctx, &replicator.MergeDLQMessagesRequest{}).Return(&replicator.MergeDLQMessagesResponse{}, assert.AnError).Times(1) + resp, err := th.MergeDLQMessages(ctx, &replicator.MergeDLQMessagesRequest{}) + assert.Equal(t, replicator.MergeDLQMessagesResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("NotifyFailoverMarkers", func(t *testing.T) { + h.EXPECT().NotifyFailoverMarkers(ctx, &hist.NotifyFailoverMarkersRequest{}).Return(assert.AnError).Times(1) + err := th.NotifyFailoverMarkers(ctx, &hist.NotifyFailoverMarkersRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("PollMutableState", func(t *testing.T) { + h.EXPECT().PollMutableState(ctx, &hist.PollMutableStateRequest{}).Return(&hist.PollMutableStateResponse{}, assert.AnError).Times(1) + resp, err := th.PollMutableState(ctx, &hist.PollMutableStateRequest{}) + assert.Equal(t, hist.PollMutableStateResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("PurgeDLQMessages", func(t *testing.T) { + h.EXPECT().PurgeDLQMessages(ctx, &replicator.PurgeDLQMessagesRequest{}).Return(assert.AnError).Times(1) + err := th.PurgeDLQMessages(ctx, &replicator.PurgeDLQMessagesRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("QueryWorkflow", func(t *testing.T) { + h.EXPECT().QueryWorkflow(ctx, &hist.QueryWorkflowRequest{}).Return(&hist.QueryWorkflowResponse{}, assert.AnError).Times(1) + resp, err := th.QueryWorkflow(ctx, &hist.QueryWorkflowRequest{}) + assert.Equal(t, hist.QueryWorkflowResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ReadDLQMessages", func(t *testing.T) { + h.EXPECT().ReadDLQMessages(ctx, &replicator.ReadDLQMessagesRequest{}).Return(&replicator.ReadDLQMessagesResponse{}, assert.AnError).Times(1) + resp, err := th.ReadDLQMessages(ctx, &replicator.ReadDLQMessagesRequest{}) + assert.Equal(t, replicator.ReadDLQMessagesResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ReapplyEvents", func(t *testing.T) { + h.EXPECT().ReapplyEvents(ctx, &hist.ReapplyEventsRequest{}).Return(assert.AnError).Times(1) + err := th.ReapplyEvents(ctx, &hist.ReapplyEventsRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RecordActivityTaskHeartbeat", func(t *testing.T) { + h.EXPECT().RecordActivityTaskHeartbeat(ctx, &hist.RecordActivityTaskHeartbeatRequest{}).Return(&shared.RecordActivityTaskHeartbeatResponse{}, assert.AnError).Times(1) + resp, err := th.RecordActivityTaskHeartbeat(ctx, &hist.RecordActivityTaskHeartbeatRequest{}) + assert.Equal(t, shared.RecordActivityTaskHeartbeatResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RecordActivityTaskStarted", func(t *testing.T) { + h.EXPECT().RecordActivityTaskStarted(ctx, &hist.RecordActivityTaskStartedRequest{}).Return(&hist.RecordActivityTaskStartedResponse{}, assert.AnError).Times(1) + resp, err := th.RecordActivityTaskStarted(ctx, &hist.RecordActivityTaskStartedRequest{}) + assert.Equal(t, hist.RecordActivityTaskStartedResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RecordChildExecutionCompleted", func(t *testing.T) { + h.EXPECT().RecordChildExecutionCompleted(ctx, &hist.RecordChildExecutionCompletedRequest{}).Return(assert.AnError).Times(1) + err := th.RecordChildExecutionCompleted(ctx, &hist.RecordChildExecutionCompletedRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RecordDecisionTaskStarted", func(t *testing.T) { + h.EXPECT().RecordDecisionTaskStarted(ctx, &hist.RecordDecisionTaskStartedRequest{}).Return(&hist.RecordDecisionTaskStartedResponse{}, assert.AnError).Times(1) + resp, err := th.RecordDecisionTaskStarted(ctx, &hist.RecordDecisionTaskStartedRequest{}) + assert.Equal(t, hist.RecordDecisionTaskStartedResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RefreshWorkflowTasks", func(t *testing.T) { + h.EXPECT().RefreshWorkflowTasks(ctx, &hist.RefreshWorkflowTasksRequest{}).Return(assert.AnError).Times(1) + err := th.RefreshWorkflowTasks(ctx, &hist.RefreshWorkflowTasksRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RemoveSignalMutableState", func(t *testing.T) { + h.EXPECT().RemoveSignalMutableState(ctx, &hist.RemoveSignalMutableStateRequest{}).Return(assert.AnError).Times(1) + err := th.RemoveSignalMutableState(ctx, &hist.RemoveSignalMutableStateRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RemoveTask", func(t *testing.T) { + h.EXPECT().RemoveTask(ctx, &shared.RemoveTaskRequest{}).Return(assert.AnError).Times(1) + err := th.RemoveTask(ctx, &shared.RemoveTaskRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ReplicateEventsV2", func(t *testing.T) { + h.EXPECT().ReplicateEventsV2(ctx, &hist.ReplicateEventsV2Request{}).Return(assert.AnError).Times(1) + err := th.ReplicateEventsV2(ctx, &hist.ReplicateEventsV2Request{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RequestCancelWorkflowExecution", func(t *testing.T) { + h.EXPECT().RequestCancelWorkflowExecution(ctx, &hist.RequestCancelWorkflowExecutionRequest{}).Return(assert.AnError).Times(1) + err := th.RequestCancelWorkflowExecution(ctx, &hist.RequestCancelWorkflowExecutionRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ResetQueue", func(t *testing.T) { + h.EXPECT().ResetQueue(ctx, &shared.ResetQueueRequest{}).Return(assert.AnError).Times(1) + err := th.ResetQueue(ctx, &shared.ResetQueueRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ResetStickyTaskList", func(t *testing.T) { + h.EXPECT().ResetStickyTaskList(ctx, &hist.ResetStickyTaskListRequest{}).Return(&hist.ResetStickyTaskListResponse{}, assert.AnError).Times(1) + resp, err := th.ResetStickyTaskList(ctx, &hist.ResetStickyTaskListRequest{}) + assert.Equal(t, hist.ResetStickyTaskListResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ResetWorkflowExecution", func(t *testing.T) { + h.EXPECT().ResetWorkflowExecution(ctx, &hist.ResetWorkflowExecutionRequest{}).Return(&shared.ResetWorkflowExecutionResponse{}, assert.AnError).Times(1) + resp, err := th.ResetWorkflowExecution(ctx, &hist.ResetWorkflowExecutionRequest{}) + assert.Equal(t, shared.ResetWorkflowExecutionResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RespondActivityTaskCanceled", func(t *testing.T) { + h.EXPECT().RespondActivityTaskCanceled(ctx, &hist.RespondActivityTaskCanceledRequest{}).Return(assert.AnError).Times(1) + err := th.RespondActivityTaskCanceled(ctx, &hist.RespondActivityTaskCanceledRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RespondActivityTaskCompleted", func(t *testing.T) { + h.EXPECT().RespondActivityTaskCompleted(ctx, &hist.RespondActivityTaskCompletedRequest{}).Return(assert.AnError).Times(1) + err := th.RespondActivityTaskCompleted(ctx, &hist.RespondActivityTaskCompletedRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RespondActivityTaskFailed", func(t *testing.T) { + h.EXPECT().RespondActivityTaskFailed(ctx, &hist.RespondActivityTaskFailedRequest{}).Return(assert.AnError).Times(1) + err := th.RespondActivityTaskFailed(ctx, &hist.RespondActivityTaskFailedRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RespondDecisionTaskCompleted", func(t *testing.T) { + h.EXPECT().RespondDecisionTaskCompleted(ctx, &hist.RespondDecisionTaskCompletedRequest{}).Return(&hist.RespondDecisionTaskCompletedResponse{}, assert.AnError).Times(1) + resp, err := th.RespondDecisionTaskCompleted(ctx, &hist.RespondDecisionTaskCompletedRequest{}) + assert.Equal(t, hist.RespondDecisionTaskCompletedResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("RespondDecisionTaskFailed", func(t *testing.T) { + h.EXPECT().RespondDecisionTaskFailed(ctx, &hist.RespondDecisionTaskFailedRequest{}).Return(assert.AnError).Times(1) + err := th.RespondDecisionTaskFailed(ctx, &hist.RespondDecisionTaskFailedRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("ScheduleDecisionTask", func(t *testing.T) { + h.EXPECT().ScheduleDecisionTask(ctx, &hist.ScheduleDecisionTaskRequest{}).Return(assert.AnError).Times(1) + err := th.ScheduleDecisionTask(ctx, &hist.ScheduleDecisionTaskRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("SignalWithStartWorkflowExecution", func(t *testing.T) { + h.EXPECT().SignalWithStartWorkflowExecution(ctx, &hist.SignalWithStartWorkflowExecutionRequest{}).Return(&shared.StartWorkflowExecutionResponse{}, assert.AnError).Times(1) + resp, err := th.SignalWithStartWorkflowExecution(ctx, &hist.SignalWithStartWorkflowExecutionRequest{}) + assert.Equal(t, shared.StartWorkflowExecutionResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("SignalWorkflowExecution", func(t *testing.T) { + h.EXPECT().SignalWorkflowExecution(ctx, &hist.SignalWorkflowExecutionRequest{}).Return(assert.AnError).Times(1) + err := th.SignalWorkflowExecution(ctx, &hist.SignalWorkflowExecutionRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("StartWorkflowExecution", func(t *testing.T) { + h.EXPECT().StartWorkflowExecution(ctx, &hist.StartWorkflowExecutionRequest{}).Return(&shared.StartWorkflowExecutionResponse{}, assert.AnError).Times(1) + resp, err := th.StartWorkflowExecution(ctx, &hist.StartWorkflowExecutionRequest{}) + assert.Equal(t, shared.StartWorkflowExecutionResponse{}, *resp) + assert.Equal(t, assert.AnError, err) + }) + t.Run("SyncActivity", func(t *testing.T) { + h.EXPECT().SyncActivity(ctx, &hist.SyncActivityRequest{}).Return(assert.AnError).Times(1) + err := th.SyncActivity(ctx, &hist.SyncActivityRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("SyncShardStatus", func(t *testing.T) { + h.EXPECT().SyncShardStatus(ctx, &hist.SyncShardStatusRequest{}).Return(assert.AnError).Times(1) + err := th.SyncShardStatus(ctx, &hist.SyncShardStatusRequest{}) + assert.Equal(t, assert.AnError, err) + }) + t.Run("TerminateWorkflowExecution", func(t *testing.T) { + h.EXPECT().TerminateWorkflowExecution(ctx, &hist.TerminateWorkflowExecutionRequest{}).Return(assert.AnError).Times(1) + err := th.TerminateWorkflowExecution(ctx, &hist.TerminateWorkflowExecutionRequest{}) + assert.Equal(t, assert.AnError, err) + }) +}