diff --git a/client/matching/client.go b/client/matching/client.go index 2023996c093..a1f7ab4cdcc 100644 --- a/client/matching/client.go +++ b/client/matching/client.go @@ -93,7 +93,7 @@ func (c *clientImpl) PollForActivityTask( ctx context.Context, request *types.MatchingPollForActivityTaskRequest, opts ...yarpc.CallOption, -) (*types.MatchingPollForActivityTaskResponse, error) { +) (*types.PollForActivityTaskResponse, error) { partition := c.loadBalancer.PickReadPartition( request.GetDomainUUID(), *request.PollRequest.GetTaskList(), diff --git a/client/matching/client_test.go b/client/matching/client_test.go index a2dc208ab95..5e57d3a76f0 100644 --- a/client/matching/client_test.go +++ b/client/matching/client_test.go @@ -228,9 +228,9 @@ func TestClient_withResponse(t *testing.T) { mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient) { balancer.EXPECT().PickReadPartition(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity, "").Return(_testPartition) p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil) - c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.MatchingPollForActivityTaskResponse{}, nil) + c.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(&types.PollForActivityTaskResponse{}, nil) }, - want: &types.MatchingPollForActivityTaskResponse{}, + want: &types.PollForActivityTaskResponse{}, }, { name: "PollForActivityTask - Error in resolving peer", diff --git a/client/matching/interface.go b/client/matching/interface.go index b5c76451387..9093bb6ccac 100644 --- a/client/matching/interface.go +++ b/client/matching/interface.go @@ -44,7 +44,7 @@ type Client interface { DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest, ...yarpc.CallOption) (*types.DescribeTaskListResponse, error) ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest, ...yarpc.CallOption) (*types.ListTaskListPartitionsResponse, error) GetTaskListsByDomain(context.Context, *types.GetTaskListsByDomainRequest, ...yarpc.CallOption) (*types.GetTaskListsByDomainResponse, error) - PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForActivityTaskResponse, error) + PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest, ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest, ...yarpc.CallOption) (*types.MatchingPollForDecisionTaskResponse, error) QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest, ...yarpc.CallOption) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest, ...yarpc.CallOption) error diff --git a/client/matching/interface_mock.go b/client/matching/interface_mock.go index 5887d0e8d5f..cdaf3474a7d 100644 --- a/client/matching/interface_mock.go +++ b/client/matching/interface_mock.go @@ -177,14 +177,14 @@ func (mr *MockClientMockRecorder) ListTaskListPartitions(arg0, arg1 interface{}, } // PollForActivityTask mocks base method. -func (m *MockClient) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest, arg2 ...yarpc.CallOption) (*types.MatchingPollForActivityTaskResponse, error) { +func (m *MockClient) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest, arg2 ...yarpc.CallOption) (*types.PollForActivityTaskResponse, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} for _, a := range arg2 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "PollForActivityTask", varargs...) - ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) + ret0, _ := ret[0].(*types.PollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/client/wrappers/errorinjectors/matching_generated.go b/client/wrappers/errorinjectors/matching_generated.go index 161d8229863..1f30730e504 100644 --- a/client/wrappers/errorinjectors/matching_generated.go +++ b/client/wrappers/errorinjectors/matching_generated.go @@ -182,11 +182,11 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { fakeErr := c.fakeErrFn(c.errorRate) var forwardCall bool if forwardCall = c.forwardCallFn(fakeErr); forwardCall { - mp2, err = c.client.PollForActivityTask(ctx, mp1, p1...) + pp1, err = c.client.PollForActivityTask(ctx, mp1, p1...) } if fakeErr != nil { diff --git a/client/wrappers/grpc/matching_generated.go b/client/wrappers/grpc/matching_generated.go index fc1eed13446..0c9a237619a 100644 --- a/client/wrappers/grpc/matching_generated.go +++ b/client/wrappers/grpc/matching_generated.go @@ -65,7 +65,7 @@ func (g matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types.M return proto.ToMatchingListTaskListPartitionsResponse(response), proto.ToError(err) } -func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { +func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { response, err := g.c.PollForActivityTask(ctx, proto.FromMatchingPollForActivityTaskRequest(mp1), p1...) return proto.ToMatchingPollForActivityTaskResponse(response), proto.ToError(err) } diff --git a/client/wrappers/metered/matching_generated.go b/client/wrappers/metered/matching_generated.go index 55ad7dc8429..99460ab7ba7 100644 --- a/client/wrappers/metered/matching_generated.go +++ b/client/wrappers/metered/matching_generated.go @@ -136,18 +136,18 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return lp1, err } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientRequests) c.emitForwardedFromStats(metrics.MatchingClientPollForActivityTaskScope, mp1) sw := c.metricsClient.StartTimer(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientLatency) - mp2, err = c.client.PollForActivityTask(ctx, mp1, p1...) + pp1, err = c.client.PollForActivityTask(ctx, mp1, p1...) sw.Stop() if err != nil { c.metricsClient.IncCounter(metrics.MatchingClientPollForActivityTaskScope, metrics.CadenceClientFailures) } - return mp2, err + return pp1, err } func (c *matchingClient) PollForDecisionTask(ctx context.Context, mp1 *types.MatchingPollForDecisionTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForDecisionTaskResponse, err error) { diff --git a/client/wrappers/retryable/matching_generated.go b/client/wrappers/retryable/matching_generated.go index 1c9960e2c27..5af5902bae9 100644 --- a/client/wrappers/retryable/matching_generated.go +++ b/client/wrappers/retryable/matching_generated.go @@ -107,8 +107,8 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return resp, err } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { - var resp *types.MatchingPollForActivityTaskResponse +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { + var resp *types.PollForActivityTaskResponse op := func() error { var err error resp, err = c.client.PollForActivityTask(ctx, mp1, p1...) diff --git a/client/wrappers/thrift/matching_generated.go b/client/wrappers/thrift/matching_generated.go index ad9035f8937..ebe6e76dda1 100644 --- a/client/wrappers/thrift/matching_generated.go +++ b/client/wrappers/thrift/matching_generated.go @@ -65,7 +65,7 @@ func (g matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types.M return thrift.ToMatchingListTaskListPartitionsResponse(response), thrift.ToError(err) } -func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { +func (g matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { response, err := g.c.PollForActivityTask(ctx, thrift.FromMatchingPollForActivityTaskRequest(mp1), p1...) return thrift.ToMatchingPollForActivityTaskResponse(response), thrift.ToError(err) } diff --git a/client/wrappers/timeout/matching_generated.go b/client/wrappers/timeout/matching_generated.go index bc3e45ff83c..2bdc741e127 100644 --- a/client/wrappers/timeout/matching_generated.go +++ b/client/wrappers/timeout/matching_generated.go @@ -92,7 +92,7 @@ func (c *matchingClient) ListTaskListPartitions(ctx context.Context, mp1 *types. return c.client.ListTaskListPartitions(ctx, mp1, p1...) } -func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (mp2 *types.MatchingPollForActivityTaskResponse, err error) { +func (c *matchingClient) PollForActivityTask(ctx context.Context, mp1 *types.MatchingPollForActivityTaskRequest, p1 ...yarpc.CallOption) (pp1 *types.PollForActivityTaskResponse, err error) { ctx, cancel := createContext(ctx, c.longPollTimeout) defer cancel() return c.client.PollForActivityTask(ctx, mp1, p1...) diff --git a/common/types/mapper/proto/matching.go b/common/types/mapper/proto/matching.go index 784cc5eb54d..22e6ddbdc56 100644 --- a/common/types/mapper/proto/matching.go +++ b/common/types/mapper/proto/matching.go @@ -324,7 +324,7 @@ func ToMatchingPollForActivityTaskRequest(t *matchingv1.PollForActivityTaskReque } } -func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse { +func FromMatchingPollForActivityTaskResponse(t *types.PollForActivityTaskResponse) *matchingv1.PollForActivityTaskResponse { if t == nil { return nil } @@ -348,11 +348,11 @@ func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTas } } -func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse { +func ToMatchingPollForActivityTaskResponse(t *matchingv1.PollForActivityTaskResponse) *types.PollForActivityTaskResponse { if t == nil { return nil } - return &types.MatchingPollForActivityTaskResponse{ + return &types.PollForActivityTaskResponse{ TaskToken: t.TaskToken, WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution), ActivityID: t.ActivityId, diff --git a/common/types/mapper/proto/matching_test.go b/common/types/mapper/proto/matching_test.go index ea47bb1a8e5..4aa54d2e08a 100644 --- a/common/types/mapper/proto/matching_test.go +++ b/common/types/mapper/proto/matching_test.go @@ -84,7 +84,7 @@ func TestMatchingPollForActivityTaskRequest(t *testing.T) { } func TestMatchingPollForActivityTaskResponse(t *testing.T) { - for _, item := range []*types.MatchingPollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} { + for _, item := range []*types.PollForActivityTaskResponse{nil, {}, &testdata.MatchingPollForActivityTaskResponse} { assert.Equal(t, item, ToMatchingPollForActivityTaskResponse(FromMatchingPollForActivityTaskResponse(item))) } } diff --git a/common/types/mapper/thrift/matching.go b/common/types/mapper/thrift/matching.go index 6e87800eb50..a7cf6e9d687 100644 --- a/common/types/mapper/thrift/matching.go +++ b/common/types/mapper/thrift/matching.go @@ -22,7 +22,6 @@ package thrift import ( "github.com/uber/cadence/.gen/go/matching" - "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/types" ) @@ -36,6 +35,8 @@ var ( ToMatchingGetTaskListsByDomainResponse = ToGetTaskListsByDomainResponse FromMatchingListTaskListPartitionsResponse = FromListTaskListPartitionsResponse ToMatchingListTaskListPartitionsResponse = ToListTaskListPartitionsResponse + FromMatchingPollForActivityTaskResponse = FromPollForActivityTaskResponse + ToMatchingPollForActivityTaskResponse = ToPollForActivityTaskResponse FromMatchingQueryWorkflowResponse = FromQueryWorkflowResponse ToMatchingQueryWorkflowResponse = ToQueryWorkflowResponse ) @@ -323,54 +324,6 @@ func ToMatchingPollForDecisionTaskResponse(t *matching.PollForDecisionTaskRespon } } -func FromMatchingPollForActivityTaskResponse(t *types.MatchingPollForActivityTaskResponse) *shared.PollForActivityTaskResponse { - if t == nil { - return nil - } - return &shared.PollForActivityTaskResponse{ - TaskToken: t.TaskToken, - WorkflowExecution: FromWorkflowExecution(t.WorkflowExecution), - ActivityId: &t.ActivityID, - ActivityType: FromActivityType(t.ActivityType), - Input: t.Input, - ScheduledTimestamp: t.ScheduledTimestamp, - ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds, - StartedTimestamp: t.StartedTimestamp, - StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds, - Attempt: &t.Attempt, - ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, - HeartbeatDetails: t.HeartbeatDetails, - WorkflowType: FromWorkflowType(t.WorkflowType), - WorkflowDomain: &t.WorkflowDomain, - Header: FromHeader(t.Header), - } -} - -func ToMatchingPollForActivityTaskResponse(t *shared.PollForActivityTaskResponse) *types.MatchingPollForActivityTaskResponse { - if t == nil { - return nil - } - return &types.MatchingPollForActivityTaskResponse{ - TaskToken: t.TaskToken, - WorkflowExecution: ToWorkflowExecution(t.WorkflowExecution), - ActivityID: t.GetActivityId(), - ActivityType: ToActivityType(t.ActivityType), - Input: t.Input, - ScheduledTimestamp: t.ScheduledTimestamp, - ScheduleToCloseTimeoutSeconds: t.ScheduleToCloseTimeoutSeconds, - StartedTimestamp: t.StartedTimestamp, - StartToCloseTimeoutSeconds: t.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: t.HeartbeatTimeoutSeconds, - Attempt: t.GetAttempt(), - ScheduledTimestampOfThisAttempt: t.ScheduledTimestampOfThisAttempt, - HeartbeatDetails: t.HeartbeatDetails, - WorkflowType: ToWorkflowType(t.WorkflowType), - WorkflowDomain: t.GetWorkflowDomain(), - Header: ToHeader(t.Header), - } -} - // FromMatchingQueryWorkflowRequest converts internal QueryWorkflowRequest type to thrift func FromMatchingQueryWorkflowRequest(t *types.MatchingQueryWorkflowRequest) *matching.QueryWorkflowRequest { if t == nil { diff --git a/common/types/mapper/thrift/matching_test.go b/common/types/mapper/thrift/matching_test.go index 83564d4fed5..19291d461f1 100644 --- a/common/types/mapper/thrift/matching_test.go +++ b/common/types/mapper/thrift/matching_test.go @@ -262,31 +262,6 @@ func TestMatchingPollForDecisionResponse(t *testing.T) { } } -func TestMatchingPollForActivityTaskResponse(t *testing.T) { - testCases := []struct { - desc string - input *types.MatchingPollForActivityTaskResponse - }{ - { - desc: "non-nil input test", - input: &testdata.MatchingPollForActivityTaskResponse, - }, - { - desc: "empty input test", - input: &types.MatchingPollForActivityTaskResponse{}, - }, - { - desc: "nil input test", - input: nil, - }, - } - for _, tc := range testCases { - thriftObj := FromMatchingPollForActivityTaskResponse(tc.input) - roundTripObj := ToMatchingPollForActivityTaskResponse(thriftObj) - assert.Equal(t, tc.input, roundTripObj) - } -} - func TestMatchingQueryWorkflowRequest(t *testing.T) { testCases := []struct { desc string diff --git a/common/types/matching.go b/common/types/matching.go index 9d6e016e295..3767409a1ea 100644 --- a/common/types/matching.go +++ b/common/types/matching.go @@ -480,26 +480,6 @@ func (v *MatchingPollForDecisionTaskResponse) GetTotalHistoryBytes() (o int64) { return } -type MatchingPollForActivityTaskResponse struct { - TaskToken []byte `json:"taskToken,omitempty"` - WorkflowExecution *WorkflowExecution `json:"workflowExecution,omitempty"` - ActivityID string `json:"activityId,omitempty"` - ActivityType *ActivityType `json:"activityType,omitempty"` - Input []byte `json:"input,omitempty"` - ScheduledTimestamp *int64 `json:"scheduledTimestamp,omitempty"` - ScheduleToCloseTimeoutSeconds *int32 `json:"scheduleToCloseTimeoutSeconds,omitempty"` - StartedTimestamp *int64 `json:"startedTimestamp,omitempty"` - StartToCloseTimeoutSeconds *int32 `json:"startToCloseTimeoutSeconds,omitempty"` - HeartbeatTimeoutSeconds *int32 `json:"heartbeatTimeoutSeconds,omitempty"` - Attempt int32 `json:"attempt,omitempty"` - ScheduledTimestampOfThisAttempt *int64 `json:"scheduledTimestampOfThisAttempt,omitempty"` - HeartbeatDetails []byte `json:"heartbeatDetails,omitempty"` - WorkflowType *WorkflowType `json:"workflowType,omitempty"` - WorkflowDomain string `json:"workflowDomain,omitempty"` - Header *Header `json:"header,omitempty"` - BacklogCountHint int64 `json:"backlogCountHint,omitempty"` -} - // MatchingQueryWorkflowRequest is an internal type (TBD...) type MatchingQueryWorkflowRequest struct { DomainUUID string `json:"domainUUID,omitempty"` diff --git a/common/types/testdata/service_matching.go b/common/types/testdata/service_matching.go index 4ab02b16754..2d9eecbf041 100644 --- a/common/types/testdata/service_matching.go +++ b/common/types/testdata/service_matching.go @@ -81,7 +81,7 @@ var ( ForwardedFrom: ForwardedFrom, IsolationGroup: IsolationGroup, } - MatchingPollForActivityTaskResponse = types.MatchingPollForActivityTaskResponse{ + MatchingPollForActivityTaskResponse = types.PollForActivityTaskResponse{ TaskToken: TaskToken, WorkflowExecution: &WorkflowExecution, ActivityID: ActivityID, diff --git a/service/frontend/api/handler.go b/service/frontend/api/handler.go index 6003ddc76ac..143410b1281 100644 --- a/service/frontend/api/handler.go +++ b/service/frontend/api/handler.go @@ -554,9 +554,8 @@ func (wh *WorkflowHandler) PollForActivityTask( return &types.PollForActivityTaskResponse{}, nil } pollerID := uuid.New().String() - var matchingResp *types.MatchingPollForActivityTaskResponse op := func() error { - matchingResp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{ + resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{ DomainUUID: domainID, PollerID: pollerID, PollRequest: pollRequest, @@ -582,24 +581,7 @@ func (wh *WorkflowHandler) PollForActivityTask( return nil, err } } - return &types.PollForActivityTaskResponse{ - TaskToken: matchingResp.TaskToken, - WorkflowExecution: matchingResp.WorkflowExecution, - ActivityID: matchingResp.ActivityID, - ActivityType: matchingResp.ActivityType, - Input: matchingResp.Input, - ScheduledTimestamp: matchingResp.ScheduledTimestamp, - ScheduleToCloseTimeoutSeconds: matchingResp.ScheduleToCloseTimeoutSeconds, - StartedTimestamp: matchingResp.StartedTimestamp, - StartToCloseTimeoutSeconds: matchingResp.StartToCloseTimeoutSeconds, - HeartbeatTimeoutSeconds: matchingResp.HeartbeatTimeoutSeconds, - Attempt: matchingResp.Attempt, - ScheduledTimestampOfThisAttempt: matchingResp.ScheduledTimestampOfThisAttempt, - HeartbeatDetails: matchingResp.HeartbeatDetails, - WorkflowType: matchingResp.WorkflowType, - WorkflowDomain: matchingResp.WorkflowDomain, - Header: matchingResp.Header, - }, nil + return resp, nil } // PollForDecisionTask - Poll for a decision task. diff --git a/service/frontend/api/handler_test.go b/service/frontend/api/handler_test.go index cb232c0b5f7..cb8affdf408 100644 --- a/service/frontend/api/handler_test.go +++ b/service/frontend/api/handler_test.go @@ -309,45 +309,6 @@ func (s *workflowHandlerSuite) TestPollForActivityTask_IsolationGroupDrained() { s.Equal(&types.PollForActivityTaskResponse{}, resp) } -func (s *workflowHandlerSuite) TestPollForActivityTask_Success() { - config := s.newConfig(dc.NewInMemoryClient()) - config.EnableTasklistIsolation = dc.GetBoolPropertyFnFilteredByDomain(true) - wh := s.getWorkflowHandler(config) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - isolationGroup := "dca1" - ctx = partition.ContextWithIsolationGroup(ctx, isolationGroup) - - s.mockDomainCache.EXPECT().GetDomainID(s.testDomain).Return(s.testDomainID, nil) - s.mockResource.IsolationGroups.EXPECT().IsDrained(gomock.Any(), s.testDomain, isolationGroup).Return(false, nil).AnyTimes() - s.mockMatchingClient.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any()).Return(&types.MatchingPollForActivityTaskResponse{ - TaskToken: []byte("token"), - WorkflowExecution: &types.WorkflowExecution{ - WorkflowID: "wid", - RunID: "rid", - }, - ActivityID: "1", - Input: []byte(`{"key": "value"}`), - }, nil) - resp, err := wh.PollForActivityTask(ctx, &types.PollForActivityTaskRequest{ - Domain: s.testDomain, - TaskList: &types.TaskList{ - Name: "task-list", - }, - }) - s.NoError(err) - s.Equal(&types.PollForActivityTaskResponse{ - TaskToken: []byte("token"), - WorkflowExecution: &types.WorkflowExecution{ - WorkflowID: "wid", - RunID: "rid", - }, - ActivityID: "1", - Input: []byte(`{"key": "value"}`), - }, resp) -} - func (s *workflowHandlerSuite) TestStartWorkflowExecution_Failed_RequestIdNotSet() { config := s.newConfig(dc.NewInMemoryClient()) config.UserRPS = dc.GetIntPropertyFn(10) diff --git a/service/matching/handler/engine.go b/service/matching/handler/engine.go index 6c1b9362f99..3113b740932 100644 --- a/service/matching/handler/engine.go +++ b/service/matching/handler/engine.go @@ -111,7 +111,7 @@ var ( // EmptyPollForDecisionTaskResponse is the response when there are no decision tasks to hand out emptyPollForDecisionTaskResponse = &types.MatchingPollForDecisionTaskResponse{} // EmptyPollForActivityTaskResponse is the response when there are no activity tasks to hand out - emptyPollForActivityTaskResponse = &types.MatchingPollForActivityTaskResponse{} + emptyPollForActivityTaskResponse = &types.PollForActivityTaskResponse{} historyServiceOperationRetryPolicy = common.CreateHistoryServiceRetryPolicy() errPumpClosed = errors.New("task list pump closed its channel") @@ -651,7 +651,7 @@ pollLoop: func (e *matchingEngineImpl) PollForActivityTask( hCtx *handlerContext, req *types.MatchingPollForActivityTaskRequest, -) (*types.MatchingPollForActivityTaskResponse, error) { +) (*types.PollForActivityTaskResponse, error) { domainID := req.GetDomainUUID() pollerID := req.GetPollerID() request := req.PollRequest @@ -744,11 +744,11 @@ pollLoop: func (e *matchingEngineImpl) createSyncMatchPollForActivityTaskResponse( task *tasklist.InternalTask, activityTaskDispatchInfo *types.ActivityTaskDispatchInfo, -) *types.MatchingPollForActivityTaskResponse { +) *types.PollForActivityTaskResponse { scheduledEvent := activityTaskDispatchInfo.ScheduledEvent attributes := scheduledEvent.ActivityTaskScheduledEventAttributes - response := &types.MatchingPollForActivityTaskResponse{} + response := &types.PollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header @@ -1075,7 +1075,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse( task *tasklist.InternalTask, historyResponse *types.RecordActivityTaskStartedResponse, scope metrics.Scope, -) *types.MatchingPollForActivityTaskResponse { +) *types.PollForActivityTaskResponse { scheduledEvent := historyResponse.ScheduledEvent if scheduledEvent.ActivityTaskScheduledEventAttributes == nil { @@ -1089,7 +1089,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse( scope.RecordTimer(metrics.AsyncMatchLatencyPerTaskList, time.Since(task.Event.CreatedTime)) } - response := &types.MatchingPollForActivityTaskResponse{} + response := &types.PollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header diff --git a/service/matching/handler/handler.go b/service/matching/handler/handler.go index 22fa6eb6728..c7ca760ae80 100644 --- a/service/matching/handler/handler.go +++ b/service/matching/handler/handler.go @@ -194,7 +194,7 @@ func (h *handlerImpl) AddDecisionTask( func (h *handlerImpl) PollForActivityTask( ctx context.Context, request *types.MatchingPollForActivityTaskRequest, -) (resp *types.MatchingPollForActivityTaskResponse, retError error) { +) (resp *types.PollForActivityTaskResponse, retError error) { defer func() { log.CapturePanic(recover(), h.logger, &retError) }() domainName := h.domainName(request.GetDomainUUID()) diff --git a/service/matching/handler/handler_test.go b/service/matching/handler/handler_test.go index b3f2c29900f..f30b701ee7f 100644 --- a/service/matching/handler/handler_test.go +++ b/service/matching/handler/handler_test.go @@ -277,7 +277,7 @@ func (s *handlerSuite) TestPollForActivityTask() { setupMocks: func() { s.mockLimiter.EXPECT().Allow().Return(true).Times(1) s.mockEngine.EXPECT().PollForActivityTask(gomock.Any(), &request). - Return(&types.MatchingPollForActivityTaskResponse{TaskToken: []byte("task-token")}, nil).Times(1) + Return(&types.PollForActivityTaskResponse{TaskToken: []byte("task-token")}, nil).Times(1) }, getCtx: func() (context.Context, context.CancelFunc) { ctx, cancel := context.WithDeadline(context.Background(), time.Now()) @@ -331,7 +331,7 @@ func (s *handlerSuite) TestPollForActivityTask() { s.Equal(tc.err, err) } else { s.NoError(err) - s.Equal(&types.MatchingPollForActivityTaskResponse{TaskToken: []byte("task-token")}, resp) + s.Equal(&types.PollForActivityTaskResponse{TaskToken: []byte("task-token")}, resp) } }) } diff --git a/service/matching/handler/interfaces.go b/service/matching/handler/interfaces.go index 1535fe92693..21f39f160e0 100644 --- a/service/matching/handler/interfaces.go +++ b/service/matching/handler/interfaces.go @@ -39,7 +39,7 @@ type ( AddDecisionTask(hCtx *handlerContext, request *types.AddDecisionTaskRequest) (syncMatch bool, err error) AddActivityTask(hCtx *handlerContext, request *types.AddActivityTaskRequest) (syncMatch bool, err error) PollForDecisionTask(hCtx *handlerContext, request *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) - PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) + PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) QueryWorkflow(hCtx *handlerContext, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(hCtx *handlerContext, request *types.MatchingRespondQueryTaskCompletedRequest) error CancelOutstandingPoll(hCtx *handlerContext, request *types.CancelOutstandingPollRequest) error @@ -59,7 +59,7 @@ type ( DescribeTaskList(context.Context, *types.MatchingDescribeTaskListRequest) (*types.DescribeTaskListResponse, error) ListTaskListPartitions(context.Context, *types.MatchingListTaskListPartitionsRequest) (*types.ListTaskListPartitionsResponse, error) GetTaskListsByDomain(context.Context, *types.GetTaskListsByDomainRequest) (*types.GetTaskListsByDomainResponse, error) - PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) + PollForActivityTask(context.Context, *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) PollForDecisionTask(context.Context, *types.MatchingPollForDecisionTaskRequest) (*types.MatchingPollForDecisionTaskResponse, error) QueryWorkflow(context.Context, *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error) RespondQueryTaskCompleted(context.Context, *types.MatchingRespondQueryTaskCompletedRequest) error diff --git a/service/matching/handler/interfaces_mock.go b/service/matching/handler/interfaces_mock.go index b2101288f56..78401a46838 100644 --- a/service/matching/handler/interfaces_mock.go +++ b/service/matching/handler/interfaces_mock.go @@ -148,10 +148,10 @@ func (mr *MockEngineMockRecorder) ListTaskListPartitions(hCtx, request interface } // PollForActivityTask mocks base method. -func (m *MockEngine) PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) { +func (m *MockEngine) PollForActivityTask(hCtx *handlerContext, request *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollForActivityTask", hCtx, request) - ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) + ret0, _ := ret[0].(*types.PollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -356,10 +356,10 @@ func (mr *MockHandlerMockRecorder) ListTaskListPartitions(arg0, arg1 interface{} } // PollForActivityTask mocks base method. -func (m *MockHandler) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest) (*types.MatchingPollForActivityTaskResponse, error) { +func (m *MockHandler) PollForActivityTask(arg0 context.Context, arg1 *types.MatchingPollForActivityTaskRequest) (*types.PollForActivityTaskResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PollForActivityTask", arg0, arg1) - ret0, _ := ret[0].(*types.MatchingPollForActivityTaskResponse) + ret0, _ := ret[0].(*types.PollForActivityTaskResponse) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/matching/tasklist/forwarder_test.go b/service/matching/tasklist/forwarder_test.go index 77af4d36b0d..34cd3da19cd 100644 --- a/service/matching/tasklist/forwarder_test.go +++ b/service/matching/tasklist/forwarder_test.go @@ -235,7 +235,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() { pollerID := uuid.New() ctx := ContextWithPollerID(context.Background(), pollerID) ctx = ContextWithIdentity(ctx, "id1") - resp := &types.MatchingPollForActivityTaskResponse{} + resp := &types.PollForActivityTaskResponse{} var request *types.MatchingPollForActivityTaskRequest t.client.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any()).Do( diff --git a/service/matching/tasklist/task.go b/service/matching/tasklist/task.go index 379221689eb..c27c27780d8 100644 --- a/service/matching/tasklist/task.go +++ b/service/matching/tasklist/task.go @@ -41,7 +41,7 @@ type ( // another matching host. This type of task is already marked as started startedTaskInfo struct { decisionTaskInfo *types.MatchingPollForDecisionTaskResponse - activityTaskInfo *types.MatchingPollForActivityTaskResponse + activityTaskInfo *types.PollForActivityTaskResponse } // InternalTask represents an activity, decision, query or started (received from another host). // this struct is more like a union and only one of [ query, event, forwarded ] is @@ -156,7 +156,7 @@ func (task *InternalTask) PollForDecisionResponse() *types.MatchingPollForDecisi // pollForActivityResponse returns the poll response for an activity task that is // already marked as started. This method should only be called when isStarted() is true -func (task *InternalTask) PollForActivityResponse() *types.MatchingPollForActivityTaskResponse { +func (task *InternalTask) PollForActivityResponse() *types.PollForActivityTaskResponse { if task.IsStarted() { return task.started.activityTaskInfo } diff --git a/service/matching/wrappers/thrift/thrift_handler_test.go b/service/matching/wrappers/thrift/thrift_handler_test.go index cdeeadb7768..6113edc7d65 100644 --- a/service/matching/wrappers/thrift/thrift_handler_test.go +++ b/service/matching/wrappers/thrift/thrift_handler_test.go @@ -78,7 +78,7 @@ func TestThriftHandler(t *testing.T) { assert.Equal(t, expectedErr, err) }) t.Run("PollForActivityTask", func(t *testing.T) { - h.EXPECT().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{}).Return(&types.MatchingPollForActivityTaskResponse{}, internalErr).Times(1) + h.EXPECT().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{}).Return(&types.PollForActivityTaskResponse{}, internalErr).Times(1) resp, err := th.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{}) assert.Equal(t, s.PollForActivityTaskResponse{WorkflowDomain: common.StringPtr(""), ActivityId: common.StringPtr(""), Attempt: common.Int32Ptr(0)}, *resp) assert.Equal(t, expectedErr, err)