diff --git a/client/admin/client.go b/client/admin/client.go index 91604ff98c5..5440a79928c 100644 --- a/client/admin/client.go +++ b/client/admin/client.go @@ -28,7 +28,6 @@ import ( "go.uber.org/yarpc" "github.com/uber/cadence/.gen/go/admin" - "github.com/uber/cadence/.gen/go/admin/adminserviceclient" "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" @@ -360,7 +359,7 @@ func (c *clientImpl) createContextWithLargeTimeout(parent context.Context) (cont return context.WithTimeout(parent, c.largeTimeout) } -func (c *clientImpl) getRandomClient() (adminserviceclient.Interface, error) { +func (c *clientImpl) getRandomClient() (Client, error) { // generate a random shard key to do load balancing key := uuid.New() client, err := c.clients.GetClientForKey(key) @@ -368,5 +367,5 @@ func (c *clientImpl) getRandomClient() (adminserviceclient.Interface, error) { return nil, err } - return client.(adminserviceclient.Interface), nil + return client.(Client), nil } diff --git a/client/admin/interface.go b/client/admin/interface.go index 1ea262915fb..55f1629d97d 100644 --- a/client/admin/interface.go +++ b/client/admin/interface.go @@ -20,9 +20,34 @@ package admin -import "github.com/uber/cadence/.gen/go/admin/adminserviceclient" +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/admin" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" +) // Client is the interface exposed by admin service client type Client interface { - adminserviceclient.Interface + AddSearchAttribute(context.Context, *admin.AddSearchAttributeRequest, ...yarpc.CallOption) error + CloseShard(context.Context, *shared.CloseShardRequest, ...yarpc.CallOption) error + DescribeCluster(context.Context, ...yarpc.CallOption) (*admin.DescribeClusterResponse, error) + DescribeHistoryHost(context.Context, *shared.DescribeHistoryHostRequest, ...yarpc.CallOption) (*shared.DescribeHistoryHostResponse, error) + DescribeQueue(context.Context, *shared.DescribeQueueRequest, ...yarpc.CallOption) (*shared.DescribeQueueResponse, error) + DescribeWorkflowExecution(context.Context, *admin.DescribeWorkflowExecutionRequest, ...yarpc.CallOption) (*admin.DescribeWorkflowExecutionResponse, error) + GetDLQReplicationMessages(context.Context, *replicator.GetDLQReplicationMessagesRequest, ...yarpc.CallOption) (*replicator.GetDLQReplicationMessagesResponse, error) + GetDomainReplicationMessages(context.Context, *replicator.GetDomainReplicationMessagesRequest, ...yarpc.CallOption) (*replicator.GetDomainReplicationMessagesResponse, error) + GetReplicationMessages(context.Context, *replicator.GetReplicationMessagesRequest, ...yarpc.CallOption) (*replicator.GetReplicationMessagesResponse, error) + GetWorkflowExecutionRawHistoryV2(context.Context, *admin.GetWorkflowExecutionRawHistoryV2Request, ...yarpc.CallOption) (*admin.GetWorkflowExecutionRawHistoryV2Response, error) + MergeDLQMessages(context.Context, *replicator.MergeDLQMessagesRequest, ...yarpc.CallOption) (*replicator.MergeDLQMessagesResponse, error) + PurgeDLQMessages(context.Context, *replicator.PurgeDLQMessagesRequest, ...yarpc.CallOption) error + ReadDLQMessages(context.Context, *replicator.ReadDLQMessagesRequest, ...yarpc.CallOption) (*replicator.ReadDLQMessagesResponse, error) + ReapplyEvents(context.Context, *shared.ReapplyEventsRequest, ...yarpc.CallOption) error + RefreshWorkflowTasks(context.Context, *shared.RefreshWorkflowTasksRequest, ...yarpc.CallOption) error + RemoveTask(context.Context, *shared.RemoveTaskRequest, ...yarpc.CallOption) error + ResendReplicationTasks(context.Context, *admin.ResendReplicationTasksRequest, ...yarpc.CallOption) error + ResetQueue(context.Context, *shared.ResetQueueRequest, ...yarpc.CallOption) error } diff --git a/client/admin/thriftClient.go b/client/admin/thriftClient.go new file mode 100644 index 00000000000..ea880631b34 --- /dev/null +++ b/client/admin/thriftClient.go @@ -0,0 +1,113 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package admin + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/admin" + "github.com/uber/cadence/.gen/go/admin/adminserviceclient" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" +) + +type thriftClient struct { + c adminserviceclient.Interface +} + +// NewThriftClient creates a new instance of Client with thrift protocol +func NewThriftClient(c adminserviceclient.Interface) Client { + return thriftClient{c} +} + +func (t thriftClient) AddSearchAttribute(ctx context.Context, request *admin.AddSearchAttributeRequest, opts ...yarpc.CallOption) error { + return t.c.AddSearchAttribute(ctx, request, opts...) +} + +func (t thriftClient) CloseShard(ctx context.Context, request *shared.CloseShardRequest, opts ...yarpc.CallOption) error { + return t.c.CloseShard(ctx, request, opts...) +} + +func (t thriftClient) DescribeCluster(ctx context.Context, opts ...yarpc.CallOption) (*admin.DescribeClusterResponse, error) { + return t.c.DescribeCluster(ctx, opts...) +} + +func (t thriftClient) DescribeHistoryHost(ctx context.Context, request *shared.DescribeHistoryHostRequest, opts ...yarpc.CallOption) (*shared.DescribeHistoryHostResponse, error) { + return t.c.DescribeHistoryHost(ctx, request, opts...) +} + +func (t thriftClient) DescribeQueue(ctx context.Context, request *shared.DescribeQueueRequest, opts ...yarpc.CallOption) (*shared.DescribeQueueResponse, error) { + return t.c.DescribeQueue(ctx, request, opts...) +} + +func (t thriftClient) DescribeWorkflowExecution(ctx context.Context, request *admin.DescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*admin.DescribeWorkflowExecutionResponse, error) { + return t.c.DescribeWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) GetDLQReplicationMessages(ctx context.Context, request *replicator.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*replicator.GetDLQReplicationMessagesResponse, error) { + return t.c.GetDLQReplicationMessages(ctx, request, opts...) +} + +func (t thriftClient) GetDomainReplicationMessages(ctx context.Context, request *replicator.GetDomainReplicationMessagesRequest, opts ...yarpc.CallOption) (*replicator.GetDomainReplicationMessagesResponse, error) { + return t.c.GetDomainReplicationMessages(ctx, request, opts...) +} + +func (t thriftClient) GetReplicationMessages(ctx context.Context, request *replicator.GetReplicationMessagesRequest, opts ...yarpc.CallOption) (*replicator.GetReplicationMessagesResponse, error) { + return t.c.GetReplicationMessages(ctx, request, opts...) +} + +func (t thriftClient) GetWorkflowExecutionRawHistoryV2(ctx context.Context, request *admin.GetWorkflowExecutionRawHistoryV2Request, opts ...yarpc.CallOption) (*admin.GetWorkflowExecutionRawHistoryV2Response, error) { + return t.c.GetWorkflowExecutionRawHistoryV2(ctx, request, opts...) +} + +func (t thriftClient) MergeDLQMessages(ctx context.Context, request *replicator.MergeDLQMessagesRequest, opts ...yarpc.CallOption) (*replicator.MergeDLQMessagesResponse, error) { + return t.c.MergeDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) PurgeDLQMessages(ctx context.Context, request *replicator.PurgeDLQMessagesRequest, opts ...yarpc.CallOption) error { + return t.c.PurgeDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) ReadDLQMessages(ctx context.Context, request *replicator.ReadDLQMessagesRequest, opts ...yarpc.CallOption) (*replicator.ReadDLQMessagesResponse, error) { + return t.c.ReadDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) ReapplyEvents(ctx context.Context, request *shared.ReapplyEventsRequest, opts ...yarpc.CallOption) error { + return t.c.ReapplyEvents(ctx, request, opts...) +} + +func (t thriftClient) RefreshWorkflowTasks(ctx context.Context, request *shared.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error { + return t.c.RefreshWorkflowTasks(ctx, request, opts...) +} + +func (t thriftClient) RemoveTask(ctx context.Context, request *shared.RemoveTaskRequest, opts ...yarpc.CallOption) error { + return t.c.RemoveTask(ctx, request, opts...) +} + +func (t thriftClient) ResendReplicationTasks(ctx context.Context, request *admin.ResendReplicationTasksRequest, opts ...yarpc.CallOption) error { + return t.c.ResendReplicationTasks(ctx, request, opts...) +} + +func (t thriftClient) ResetQueue(ctx context.Context, request *shared.ResetQueueRequest, opts ...yarpc.CallOption) error { + return t.c.ResetQueue(ctx, request, opts...) +} diff --git a/client/clientfactory.go b/client/clientfactory.go index 530d93b2c6f..4ee81a11cf4 100644 --- a/client/clientfactory.go +++ b/client/clientfactory.go @@ -156,7 +156,7 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout( clientProvider := func(clientKey string) (interface{}, error) { dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(matchingCaller, common.MatchingServiceName, clientKey) - return matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName)), nil + return matching.NewThriftClient(matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName))), nil } client := matching.NewClient( @@ -193,7 +193,7 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeout( clientProvider := func(clientKey string) (interface{}, error) { dispatcher := cf.rpcFactory.CreateDispatcherForOutbound(frontendCaller, common.FrontendServiceName, clientKey) - return workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName)), nil + return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(common.FrontendServiceName))), nil } client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider)) @@ -214,7 +214,7 @@ func (cf *rpcClientFactory) NewAdminClientWithTimeoutAndDispatcher( } clientProvider := func(clientKey string) (interface{}, error) { - return adminserviceclient.New(dispatcher.ClientConfig(rpcName)), nil + return admin.NewThriftClient(adminserviceclient.New(dispatcher.ClientConfig(rpcName))), nil } client := admin.NewClient(timeout, largeTimeout, common.NewClientCache(keyResolver, clientProvider)) @@ -235,7 +235,7 @@ func (cf *rpcClientFactory) NewFrontendClientWithTimeoutAndDispatcher( } clientProvider := func(clientKey string) (interface{}, error) { - return workflowserviceclient.New(dispatcher.ClientConfig(rpcName)), nil + return frontend.NewThriftClient(workflowserviceclient.New(dispatcher.ClientConfig(rpcName))), nil } client := frontend.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider)) diff --git a/client/frontend/client.go b/client/frontend/client.go index 1f70af9b9a1..a53c258fb76 100644 --- a/client/frontend/client.go +++ b/client/frontend/client.go @@ -27,7 +27,6 @@ import ( "github.com/pborman/uuid" "go.uber.org/yarpc" - "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" ) @@ -649,7 +648,7 @@ func (c *clientImpl) createLongPollContext(parent context.Context) (context.Cont return context.WithTimeout(parent, c.longPollTimeout) } -func (c *clientImpl) getRandomClient() (workflowserviceclient.Interface, error) { +func (c *clientImpl) getRandomClient() (Client, error) { // generate a random shard key to do load balancing key := uuid.New() client, err := c.clients.GetClientForKey(key) @@ -657,7 +656,7 @@ func (c *clientImpl) getRandomClient() (workflowserviceclient.Interface, error) return nil, err } - return client.(workflowserviceclient.Interface), nil + return client.(Client), nil } func (c *clientImpl) GetClusterInfo( diff --git a/client/frontend/interface.go b/client/frontend/interface.go index df8990d6ff5..fc8db23c5da 100644 --- a/client/frontend/interface.go +++ b/client/frontend/interface.go @@ -21,10 +21,51 @@ package frontend import ( - "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/shared" ) // Client is the interface exposed by frontend service client type Client interface { - workflowserviceclient.Interface + CountWorkflowExecutions(context.Context, *shared.CountWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.CountWorkflowExecutionsResponse, error) + DeprecateDomain(context.Context, *shared.DeprecateDomainRequest, ...yarpc.CallOption) error + DescribeDomain(context.Context, *shared.DescribeDomainRequest, ...yarpc.CallOption) (*shared.DescribeDomainResponse, error) + DescribeTaskList(context.Context, *shared.DescribeTaskListRequest, ...yarpc.CallOption) (*shared.DescribeTaskListResponse, error) + DescribeWorkflowExecution(context.Context, *shared.DescribeWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.DescribeWorkflowExecutionResponse, error) + GetClusterInfo(context.Context, ...yarpc.CallOption) (*shared.ClusterInfo, error) + GetSearchAttributes(context.Context, ...yarpc.CallOption) (*shared.GetSearchAttributesResponse, error) + GetWorkflowExecutionHistory(context.Context, *shared.GetWorkflowExecutionHistoryRequest, ...yarpc.CallOption) (*shared.GetWorkflowExecutionHistoryResponse, error) + ListArchivedWorkflowExecutions(context.Context, *shared.ListArchivedWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.ListArchivedWorkflowExecutionsResponse, error) + ListClosedWorkflowExecutions(context.Context, *shared.ListClosedWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.ListClosedWorkflowExecutionsResponse, error) + ListDomains(context.Context, *shared.ListDomainsRequest, ...yarpc.CallOption) (*shared.ListDomainsResponse, error) + ListOpenWorkflowExecutions(context.Context, *shared.ListOpenWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.ListOpenWorkflowExecutionsResponse, error) + ListTaskListPartitions(context.Context, *shared.ListTaskListPartitionsRequest, ...yarpc.CallOption) (*shared.ListTaskListPartitionsResponse, error) + ListWorkflowExecutions(context.Context, *shared.ListWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.ListWorkflowExecutionsResponse, error) + PollForActivityTask(context.Context, *shared.PollForActivityTaskRequest, ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) + PollForDecisionTask(context.Context, *shared.PollForDecisionTaskRequest, ...yarpc.CallOption) (*shared.PollForDecisionTaskResponse, error) + QueryWorkflow(context.Context, *shared.QueryWorkflowRequest, ...yarpc.CallOption) (*shared.QueryWorkflowResponse, error) + RecordActivityTaskHeartbeat(context.Context, *shared.RecordActivityTaskHeartbeatRequest, ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) + RecordActivityTaskHeartbeatByID(context.Context, *shared.RecordActivityTaskHeartbeatByIDRequest, ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) + RegisterDomain(context.Context, *shared.RegisterDomainRequest, ...yarpc.CallOption) error + RequestCancelWorkflowExecution(context.Context, *shared.RequestCancelWorkflowExecutionRequest, ...yarpc.CallOption) error + ResetStickyTaskList(context.Context, *shared.ResetStickyTaskListRequest, ...yarpc.CallOption) (*shared.ResetStickyTaskListResponse, error) + ResetWorkflowExecution(context.Context, *shared.ResetWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.ResetWorkflowExecutionResponse, error) + RespondActivityTaskCanceled(context.Context, *shared.RespondActivityTaskCanceledRequest, ...yarpc.CallOption) error + RespondActivityTaskCanceledByID(context.Context, *shared.RespondActivityTaskCanceledByIDRequest, ...yarpc.CallOption) error + RespondActivityTaskCompleted(context.Context, *shared.RespondActivityTaskCompletedRequest, ...yarpc.CallOption) error + RespondActivityTaskCompletedByID(context.Context, *shared.RespondActivityTaskCompletedByIDRequest, ...yarpc.CallOption) error + RespondActivityTaskFailed(context.Context, *shared.RespondActivityTaskFailedRequest, ...yarpc.CallOption) error + RespondActivityTaskFailedByID(context.Context, *shared.RespondActivityTaskFailedByIDRequest, ...yarpc.CallOption) error + RespondDecisionTaskCompleted(context.Context, *shared.RespondDecisionTaskCompletedRequest, ...yarpc.CallOption) (*shared.RespondDecisionTaskCompletedResponse, error) + RespondDecisionTaskFailed(context.Context, *shared.RespondDecisionTaskFailedRequest, ...yarpc.CallOption) error + RespondQueryTaskCompleted(context.Context, *shared.RespondQueryTaskCompletedRequest, ...yarpc.CallOption) error + ScanWorkflowExecutions(context.Context, *shared.ListWorkflowExecutionsRequest, ...yarpc.CallOption) (*shared.ListWorkflowExecutionsResponse, error) + SignalWithStartWorkflowExecution(context.Context, *shared.SignalWithStartWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) + SignalWorkflowExecution(context.Context, *shared.SignalWorkflowExecutionRequest, ...yarpc.CallOption) error + StartWorkflowExecution(context.Context, *shared.StartWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) + TerminateWorkflowExecution(context.Context, *shared.TerminateWorkflowExecutionRequest, ...yarpc.CallOption) error + UpdateDomain(context.Context, *shared.UpdateDomainRequest, ...yarpc.CallOption) (*shared.UpdateDomainResponse, error) } diff --git a/client/frontend/thriftClient.go b/client/frontend/thriftClient.go new file mode 100644 index 00000000000..97df431ced4 --- /dev/null +++ b/client/frontend/thriftClient.go @@ -0,0 +1,191 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package frontend + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/cadence/workflowserviceclient" + "github.com/uber/cadence/.gen/go/shared" +) + +type thriftClient struct { + c workflowserviceclient.Interface +} + +// NewThriftClient creates a new instance of Client with thrift protocol +func NewThriftClient(c workflowserviceclient.Interface) Client { + return thriftClient{c} +} + +func (t thriftClient) CountWorkflowExecutions(ctx context.Context, request *shared.CountWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.CountWorkflowExecutionsResponse, error) { + return t.c.CountWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) DeprecateDomain(ctx context.Context, request *shared.DeprecateDomainRequest, opts ...yarpc.CallOption) error { + return t.c.DeprecateDomain(ctx, request, opts...) +} + +func (t thriftClient) DescribeDomain(ctx context.Context, request *shared.DescribeDomainRequest, opts ...yarpc.CallOption) (*shared.DescribeDomainResponse, error) { + return t.c.DescribeDomain(ctx, request, opts...) +} + +func (t thriftClient) DescribeTaskList(ctx context.Context, request *shared.DescribeTaskListRequest, opts ...yarpc.CallOption) (*shared.DescribeTaskListResponse, error) { + return t.c.DescribeTaskList(ctx, request, opts...) +} + +func (t thriftClient) DescribeWorkflowExecution(ctx context.Context, request *shared.DescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.DescribeWorkflowExecutionResponse, error) { + return t.c.DescribeWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) GetClusterInfo(ctx context.Context, opts ...yarpc.CallOption) (*shared.ClusterInfo, error) { + return t.c.GetClusterInfo(ctx, opts...) +} + +func (t thriftClient) GetSearchAttributes(ctx context.Context, opts ...yarpc.CallOption) (*shared.GetSearchAttributesResponse, error) { + return t.c.GetSearchAttributes(ctx, opts...) +} + +func (t thriftClient) GetWorkflowExecutionHistory(ctx context.Context, request *shared.GetWorkflowExecutionHistoryRequest, opts ...yarpc.CallOption) (*shared.GetWorkflowExecutionHistoryResponse, error) { + return t.c.GetWorkflowExecutionHistory(ctx, request, opts...) +} + +func (t thriftClient) ListArchivedWorkflowExecutions(ctx context.Context, request *shared.ListArchivedWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.ListArchivedWorkflowExecutionsResponse, error) { + return t.c.ListArchivedWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) ListClosedWorkflowExecutions(ctx context.Context, request *shared.ListClosedWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.ListClosedWorkflowExecutionsResponse, error) { + return t.c.ListClosedWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) ListDomains(ctx context.Context, request *shared.ListDomainsRequest, opts ...yarpc.CallOption) (*shared.ListDomainsResponse, error) { + return t.c.ListDomains(ctx, request, opts...) +} + +func (t thriftClient) ListOpenWorkflowExecutions(ctx context.Context, request *shared.ListOpenWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.ListOpenWorkflowExecutionsResponse, error) { + return t.c.ListOpenWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) ListTaskListPartitions(ctx context.Context, request *shared.ListTaskListPartitionsRequest, opts ...yarpc.CallOption) (*shared.ListTaskListPartitionsResponse, error) { + return t.c.ListTaskListPartitions(ctx, request, opts...) +} + +func (t thriftClient) ListWorkflowExecutions(ctx context.Context, request *shared.ListWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.ListWorkflowExecutionsResponse, error) { + return t.c.ListWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) PollForActivityTask(ctx context.Context, request *shared.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) { + return t.c.PollForActivityTask(ctx, request, opts...) +} + +func (t thriftClient) PollForDecisionTask(ctx context.Context, request *shared.PollForDecisionTaskRequest, opts ...yarpc.CallOption) (*shared.PollForDecisionTaskResponse, error) { + return t.c.PollForDecisionTask(ctx, request, opts...) +} + +func (t thriftClient) QueryWorkflow(ctx context.Context, request *shared.QueryWorkflowRequest, opts ...yarpc.CallOption) (*shared.QueryWorkflowResponse, error) { + return t.c.QueryWorkflow(ctx, request, opts...) +} + +func (t thriftClient) RecordActivityTaskHeartbeat(ctx context.Context, request *shared.RecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) { + return t.c.RecordActivityTaskHeartbeat(ctx, request, opts...) +} + +func (t thriftClient) RecordActivityTaskHeartbeatByID(ctx context.Context, request *shared.RecordActivityTaskHeartbeatByIDRequest, opts ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) { + return t.c.RecordActivityTaskHeartbeatByID(ctx, request, opts...) +} + +func (t thriftClient) RegisterDomain(ctx context.Context, request *shared.RegisterDomainRequest, opts ...yarpc.CallOption) error { + return t.c.RegisterDomain(ctx, request, opts...) +} + +func (t thriftClient) RequestCancelWorkflowExecution(ctx context.Context, request *shared.RequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.RequestCancelWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) ResetStickyTaskList(ctx context.Context, request *shared.ResetStickyTaskListRequest, opts ...yarpc.CallOption) (*shared.ResetStickyTaskListResponse, error) { + return t.c.ResetStickyTaskList(ctx, request, opts...) +} + +func (t thriftClient) ResetWorkflowExecution(ctx context.Context, request *shared.ResetWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.ResetWorkflowExecutionResponse, error) { + return t.c.ResetWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCanceled(ctx context.Context, request *shared.RespondActivityTaskCanceledRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCanceled(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCanceledByID(ctx context.Context, request *shared.RespondActivityTaskCanceledByIDRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCanceledByID(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCompleted(ctx context.Context, request *shared.RespondActivityTaskCompletedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCompleted(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCompletedByID(ctx context.Context, request *shared.RespondActivityTaskCompletedByIDRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCompletedByID(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskFailed(ctx context.Context, request *shared.RespondActivityTaskFailedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskFailed(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskFailedByID(ctx context.Context, request *shared.RespondActivityTaskFailedByIDRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskFailedByID(ctx, request, opts...) +} + +func (t thriftClient) RespondDecisionTaskCompleted(ctx context.Context, request *shared.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (*shared.RespondDecisionTaskCompletedResponse, error) { + return t.c.RespondDecisionTaskCompleted(ctx, request, opts...) +} + +func (t thriftClient) RespondDecisionTaskFailed(ctx context.Context, request *shared.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondDecisionTaskFailed(ctx, request, opts...) +} + +func (t thriftClient) RespondQueryTaskCompleted(ctx context.Context, request *shared.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondQueryTaskCompleted(ctx, request, opts...) +} + +func (t thriftClient) ScanWorkflowExecutions(ctx context.Context, request *shared.ListWorkflowExecutionsRequest, opts ...yarpc.CallOption) (*shared.ListWorkflowExecutionsResponse, error) { + return t.c.ScanWorkflowExecutions(ctx, request, opts...) +} + +func (t thriftClient) SignalWithStartWorkflowExecution(ctx context.Context, request *shared.SignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + return t.c.SignalWithStartWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) SignalWorkflowExecution(ctx context.Context, request *shared.SignalWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.SignalWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) StartWorkflowExecution(ctx context.Context, request *shared.StartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + return t.c.StartWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) TerminateWorkflowExecution(ctx context.Context, request *shared.TerminateWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.TerminateWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) UpdateDomain(ctx context.Context, request *shared.UpdateDomainRequest, opts ...yarpc.CallOption) (*shared.UpdateDomainResponse, error) { + return t.c.UpdateDomain(ctx, request, opts...) +} diff --git a/client/history/client.go b/client/history/client.go index 2d1c9a76df0..2c716487db1 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -29,7 +29,6 @@ import ( "go.uber.org/yarpc" h "github.com/uber/cadence/.gen/go/history" - "github.com/uber/cadence/.gen/go/history/historyserviceclient" "github.com/uber/cadence/.gen/go/replicator" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" @@ -79,7 +78,7 @@ func (c *clientImpl) StartWorkflowExecution( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.StartWorkflowExecutionResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -104,7 +103,7 @@ func (c *clientImpl) GetMutableState( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.GetMutableStateResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -129,7 +128,7 @@ func (c *clientImpl) PollMutableState( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.PollMutableStateResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -150,7 +149,7 @@ func (c *clientImpl) DescribeHistoryHost( ) (*workflow.DescribeHistoryHostResponse, error) { var err error - var client historyserviceclient.Interface + var client Client if request.ShardIdForHost != nil { client, err = c.getClientForShardID(int(request.GetShardIdForHost())) @@ -161,7 +160,7 @@ func (c *clientImpl) DescribeHistoryHost( if err != nil { return nil, err } - client = ret.(historyserviceclient.Interface) + client = ret.(Client) } if err != nil { return nil, err @@ -169,7 +168,7 @@ func (c *clientImpl) DescribeHistoryHost( opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.DescribeHistoryHostResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -189,7 +188,7 @@ func (c *clientImpl) RemoveTask( opts ...yarpc.CallOption, ) error { var err error - var client historyserviceclient.Interface + var client Client if request.ShardID != nil { client, err = c.getClientForShardID(int(request.GetShardID())) if err != nil { @@ -197,7 +196,7 @@ func (c *clientImpl) RemoveTask( } } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -216,7 +215,7 @@ func (c *clientImpl) CloseShard( ) error { var err error - var client historyserviceclient.Interface + var client Client if request.ShardID != nil { client, err = c.getClientForShardID(int(request.GetShardID())) if err != nil { @@ -224,7 +223,7 @@ func (c *clientImpl) CloseShard( } } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -246,7 +245,7 @@ func (c *clientImpl) ResetQueue( ) error { var err error - var client historyserviceclient.Interface + var client Client if request.ShardID != nil { client, err = c.getClientForShardID(int(request.GetShardID())) if err != nil { @@ -254,7 +253,7 @@ func (c *clientImpl) ResetQueue( } } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -276,7 +275,7 @@ func (c *clientImpl) DescribeQueue( ) (*workflow.DescribeQueueResponse, error) { var err error - var client historyserviceclient.Interface + var client Client if request.ShardID != nil { client, err = c.getClientForShardID(int(request.GetShardID())) if err != nil { @@ -285,7 +284,7 @@ func (c *clientImpl) DescribeQueue( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.DescribeQueueResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -311,7 +310,7 @@ func (c *clientImpl) DescribeMutableState( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.DescribeMutableStateResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -336,7 +335,7 @@ func (c *clientImpl) ResetStickyTaskList( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.ResetStickyTaskListResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -361,7 +360,7 @@ func (c *clientImpl) DescribeWorkflowExecution( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.DescribeWorkflowExecutionResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -386,7 +385,7 @@ func (c *clientImpl) RecordDecisionTaskStarted( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.RecordDecisionTaskStartedResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -411,7 +410,7 @@ func (c *clientImpl) RecordActivityTaskStarted( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.RecordActivityTaskStartedResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -440,7 +439,7 @@ func (c *clientImpl) RespondDecisionTaskCompleted( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.RespondDecisionTaskCompletedResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() response, err = client.RespondDecisionTaskCompleted(ctx, request, opts...) @@ -464,7 +463,7 @@ func (c *clientImpl) RespondDecisionTaskFailed( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RespondDecisionTaskFailed(ctx, request, opts...) @@ -487,7 +486,7 @@ func (c *clientImpl) RespondActivityTaskCompleted( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RespondActivityTaskCompleted(ctx, request, opts...) @@ -510,7 +509,7 @@ func (c *clientImpl) RespondActivityTaskFailed( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RespondActivityTaskFailed(ctx, request, opts...) @@ -533,7 +532,7 @@ func (c *clientImpl) RespondActivityTaskCanceled( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RespondActivityTaskCanceled(ctx, request, opts...) @@ -557,7 +556,7 @@ func (c *clientImpl) RecordActivityTaskHeartbeat( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.RecordActivityTaskHeartbeatResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -581,7 +580,7 @@ func (c *clientImpl) RequestCancelWorkflowExecution( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RequestCancelWorkflowExecution(ctx, request, opts...) @@ -599,7 +598,7 @@ func (c *clientImpl) SignalWorkflowExecution( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.SignalWorkflowExecution(ctx, request, opts...) @@ -620,7 +619,7 @@ func (c *clientImpl) SignalWithStartWorkflowExecution( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.StartWorkflowExecutionResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -644,7 +643,7 @@ func (c *clientImpl) RemoveSignalMutableState( if err != nil { return err } - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RemoveSignalMutableState(ctx, request) @@ -664,7 +663,7 @@ func (c *clientImpl) TerminateWorkflowExecution( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.TerminateWorkflowExecution(ctx, request, opts...) @@ -684,7 +683,7 @@ func (c *clientImpl) ResetWorkflowExecution( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *workflow.ResetWorkflowExecutionResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() response, err = client.ResetWorkflowExecution(ctx, request, opts...) @@ -707,7 +706,7 @@ func (c *clientImpl) ScheduleDecisionTask( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.ScheduleDecisionTask(ctx, request, opts...) @@ -726,7 +725,7 @@ func (c *clientImpl) RecordChildExecutionCompleted( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RecordChildExecutionCompleted(ctx, request, opts...) @@ -745,7 +744,7 @@ func (c *clientImpl) ReplicateEventsV2( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.ReplicateEventsV2(ctx, request, opts...) @@ -767,7 +766,7 @@ func (c *clientImpl) SyncShardStatus( } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.SyncShardStatus(ctx, request, opts...) @@ -787,7 +786,7 @@ func (c *clientImpl) SyncActivity( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.SyncActivity(ctx, request, opts...) @@ -807,7 +806,7 @@ func (c *clientImpl) QueryWorkflow( } opts = common.AggregateYarpcOptions(ctx, opts...) var response *h.QueryWorkflowResponse - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { var err error ctx, cancel := c.createContext(ctx) defer cancel() @@ -826,7 +825,7 @@ func (c *clientImpl) GetReplicationMessages( request *replicator.GetReplicationMessagesRequest, opts ...yarpc.CallOption, ) (*replicator.GetReplicationMessagesResponse, error) { - requestsByClient := make(map[historyserviceclient.Interface]*replicator.GetReplicationMessagesRequest) + requestsByClient := make(map[Client]*replicator.GetReplicationMessagesRequest) for _, token := range request.Tokens { client, err := c.getClientForShardID(int(token.GetShardID())) @@ -849,7 +848,7 @@ func (c *clientImpl) GetReplicationMessages( respChan := make(chan *replicator.GetReplicationMessagesResponse, len(requestsByClient)) errChan := make(chan error, 1) for client, req := range requestsByClient { - go func(client historyserviceclient.Interface, request *replicator.GetReplicationMessagesRequest) { + go func(client Client, request *replicator.GetReplicationMessagesRequest) { defer wg.Done() ctx, cancel := c.createContext(ctx) @@ -916,7 +915,7 @@ func (c *clientImpl) ReapplyEvents( return err } opts = common.AggregateYarpcOptions(ctx, opts...) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.ReapplyEvents(ctx, request, opts...) @@ -973,7 +972,7 @@ func (c *clientImpl) RefreshWorkflowTasks( opts ...yarpc.CallOption, ) error { client, err := c.getClientForWorkflowID(request.GetRequest().GetExecution().GetWorkflowId()) - op := func(ctx context.Context, client historyserviceclient.Interface) error { + op := func(ctx context.Context, client Client) error { ctx, cancel := c.createContext(ctx) defer cancel() return client.RefreshWorkflowTasks(ctx, request, opts...) @@ -987,7 +986,7 @@ func (c *clientImpl) NotifyFailoverMarkers( request *h.NotifyFailoverMarkersRequest, opts ...yarpc.CallOption, ) error { - requestsByClient := make(map[historyserviceclient.Interface]*h.NotifyFailoverMarkersRequest) + requestsByClient := make(map[Client]*h.NotifyFailoverMarkersRequest) for _, token := range request.GetFailoverMarkerTokens() { marker := token.GetFailoverMarker() @@ -1009,7 +1008,7 @@ func (c *clientImpl) NotifyFailoverMarkers( wg.Add(len(requestsByClient)) respChan := make(chan error, len(requestsByClient)) for client, req := range requestsByClient { - go func(client historyserviceclient.Interface, request *h.NotifyFailoverMarkersRequest) { + go func(client Client, request *h.NotifyFailoverMarkersRequest) { defer wg.Done() ctx, cancel := c.createContext(ctx) @@ -1041,28 +1040,28 @@ func (c *clientImpl) createContext(parent context.Context) (context.Context, con return context.WithTimeout(parent, c.timeout) } -func (c *clientImpl) getClientForWorkflowID(workflowID string) (historyserviceclient.Interface, error) { +func (c *clientImpl) getClientForWorkflowID(workflowID string) (Client, error) { key := common.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) return c.getClientForShardID(key) } -func (c *clientImpl) getClientForDomainID(domainID string) (historyserviceclient.Interface, error) { +func (c *clientImpl) getClientForDomainID(domainID string) (Client, error) { key := common.DomainIDToHistoryShard(domainID, c.numberOfShards) return c.getClientForShardID(key) } -func (c *clientImpl) getClientForShardID(shardID int) (historyserviceclient.Interface, error) { +func (c *clientImpl) getClientForShardID(shardID int) (Client, error) { client, err := c.clients.GetClientForKey(string(shardID)) if err != nil { return nil, err } - return client.(historyserviceclient.Interface), nil + return client.(Client), nil } func (c *clientImpl) executeWithRedirect( ctx context.Context, - client historyserviceclient.Interface, - op func(ctx context.Context, client historyserviceclient.Interface) error, + client Client, + op func(ctx context.Context, client Client) error, ) error { var err error if ctx == nil { @@ -1082,7 +1081,7 @@ redirectLoop: if err != nil { return err } - client = ret.(historyserviceclient.Interface) + client = ret.(Client) continue redirectLoop } } diff --git a/client/history/interface.go b/client/history/interface.go index ffbb772d72b..0849164a7d4 100644 --- a/client/history/interface.go +++ b/client/history/interface.go @@ -21,10 +21,54 @@ package history import ( - "github.com/uber/cadence/.gen/go/history/historyserviceclient" + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" ) // Client is the interface exposed by history service client type Client interface { - historyserviceclient.Interface + CloseShard(context.Context, *shared.CloseShardRequest, ...yarpc.CallOption) error + DescribeHistoryHost(context.Context, *shared.DescribeHistoryHostRequest, ...yarpc.CallOption) (*shared.DescribeHistoryHostResponse, error) + DescribeMutableState(context.Context, *history.DescribeMutableStateRequest, ...yarpc.CallOption) (*history.DescribeMutableStateResponse, error) + DescribeQueue(context.Context, *shared.DescribeQueueRequest, ...yarpc.CallOption) (*shared.DescribeQueueResponse, error) + DescribeWorkflowExecution(context.Context, *history.DescribeWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.DescribeWorkflowExecutionResponse, error) + GetDLQReplicationMessages(context.Context, *replicator.GetDLQReplicationMessagesRequest, ...yarpc.CallOption) (*replicator.GetDLQReplicationMessagesResponse, error) + GetMutableState(context.Context, *history.GetMutableStateRequest, ...yarpc.CallOption) (*history.GetMutableStateResponse, error) + GetReplicationMessages(context.Context, *replicator.GetReplicationMessagesRequest, ...yarpc.CallOption) (*replicator.GetReplicationMessagesResponse, error) + MergeDLQMessages(context.Context, *replicator.MergeDLQMessagesRequest, ...yarpc.CallOption) (*replicator.MergeDLQMessagesResponse, error) + NotifyFailoverMarkers(context.Context, *history.NotifyFailoverMarkersRequest, ...yarpc.CallOption) error + PollMutableState(context.Context, *history.PollMutableStateRequest, ...yarpc.CallOption) (*history.PollMutableStateResponse, error) + PurgeDLQMessages(context.Context, *replicator.PurgeDLQMessagesRequest, ...yarpc.CallOption) error + QueryWorkflow(context.Context, *history.QueryWorkflowRequest, ...yarpc.CallOption) (*history.QueryWorkflowResponse, error) + ReadDLQMessages(context.Context, *replicator.ReadDLQMessagesRequest, ...yarpc.CallOption) (*replicator.ReadDLQMessagesResponse, error) + ReapplyEvents(context.Context, *history.ReapplyEventsRequest, ...yarpc.CallOption) error + RecordActivityTaskHeartbeat(context.Context, *history.RecordActivityTaskHeartbeatRequest, ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) + RecordActivityTaskStarted(context.Context, *history.RecordActivityTaskStartedRequest, ...yarpc.CallOption) (*history.RecordActivityTaskStartedResponse, error) + RecordChildExecutionCompleted(context.Context, *history.RecordChildExecutionCompletedRequest, ...yarpc.CallOption) error + RecordDecisionTaskStarted(context.Context, *history.RecordDecisionTaskStartedRequest, ...yarpc.CallOption) (*history.RecordDecisionTaskStartedResponse, error) + RefreshWorkflowTasks(context.Context, *history.RefreshWorkflowTasksRequest, ...yarpc.CallOption) error + RemoveSignalMutableState(context.Context, *history.RemoveSignalMutableStateRequest, ...yarpc.CallOption) error + RemoveTask(context.Context, *shared.RemoveTaskRequest, ...yarpc.CallOption) error + ReplicateEventsV2(context.Context, *history.ReplicateEventsV2Request, ...yarpc.CallOption) error + RequestCancelWorkflowExecution(context.Context, *history.RequestCancelWorkflowExecutionRequest, ...yarpc.CallOption) error + ResetQueue(context.Context, *shared.ResetQueueRequest, ...yarpc.CallOption) error + ResetStickyTaskList(context.Context, *history.ResetStickyTaskListRequest, ...yarpc.CallOption) (*history.ResetStickyTaskListResponse, error) + ResetWorkflowExecution(context.Context, *history.ResetWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.ResetWorkflowExecutionResponse, error) + RespondActivityTaskCanceled(context.Context, *history.RespondActivityTaskCanceledRequest, ...yarpc.CallOption) error + RespondActivityTaskCompleted(context.Context, *history.RespondActivityTaskCompletedRequest, ...yarpc.CallOption) error + RespondActivityTaskFailed(context.Context, *history.RespondActivityTaskFailedRequest, ...yarpc.CallOption) error + RespondDecisionTaskCompleted(context.Context, *history.RespondDecisionTaskCompletedRequest, ...yarpc.CallOption) (*history.RespondDecisionTaskCompletedResponse, error) + RespondDecisionTaskFailed(context.Context, *history.RespondDecisionTaskFailedRequest, ...yarpc.CallOption) error + ScheduleDecisionTask(context.Context, *history.ScheduleDecisionTaskRequest, ...yarpc.CallOption) error + SignalWithStartWorkflowExecution(context.Context, *history.SignalWithStartWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) + SignalWorkflowExecution(context.Context, *history.SignalWorkflowExecutionRequest, ...yarpc.CallOption) error + StartWorkflowExecution(context.Context, *history.StartWorkflowExecutionRequest, ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) + SyncActivity(context.Context, *history.SyncActivityRequest, ...yarpc.CallOption) error + SyncShardStatus(context.Context, *history.SyncShardStatusRequest, ...yarpc.CallOption) error + TerminateWorkflowExecution(context.Context, *history.TerminateWorkflowExecutionRequest, ...yarpc.CallOption) error } diff --git a/client/history/thriftClient.go b/client/history/thriftClient.go new file mode 100644 index 00000000000..6efeab9276f --- /dev/null +++ b/client/history/thriftClient.go @@ -0,0 +1,197 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/history/historyserviceclient" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/uber/cadence/.gen/go/shared" +) + +type thriftClient struct { + c historyserviceclient.Interface +} + +// NewThriftClient creates a new instance of Client with thrift protocol +func NewThriftClient(c historyserviceclient.Interface) Client { + return thriftClient{c} +} + +func (t thriftClient) CloseShard(ctx context.Context, request *shared.CloseShardRequest, opts ...yarpc.CallOption) error { + return t.c.CloseShard(ctx, request, opts...) +} + +func (t thriftClient) DescribeHistoryHost(ctx context.Context, request *shared.DescribeHistoryHostRequest, opts ...yarpc.CallOption) (*shared.DescribeHistoryHostResponse, error) { + return t.c.DescribeHistoryHost(ctx, request, opts...) +} + +func (t thriftClient) DescribeMutableState(ctx context.Context, request *history.DescribeMutableStateRequest, opts ...yarpc.CallOption) (*history.DescribeMutableStateResponse, error) { + return t.c.DescribeMutableState(ctx, request, opts...) +} + +func (t thriftClient) DescribeQueue(ctx context.Context, request *shared.DescribeQueueRequest, opts ...yarpc.CallOption) (*shared.DescribeQueueResponse, error) { + return t.c.DescribeQueue(ctx, request, opts...) +} + +func (t thriftClient) DescribeWorkflowExecution(ctx context.Context, request *history.DescribeWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.DescribeWorkflowExecutionResponse, error) { + return t.c.DescribeWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) GetDLQReplicationMessages(ctx context.Context, request *replicator.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption) (*replicator.GetDLQReplicationMessagesResponse, error) { + return t.c.GetDLQReplicationMessages(ctx, request, opts...) +} + +func (t thriftClient) GetMutableState(ctx context.Context, request *history.GetMutableStateRequest, opts ...yarpc.CallOption) (*history.GetMutableStateResponse, error) { + return t.c.GetMutableState(ctx, request, opts...) +} + +func (t thriftClient) GetReplicationMessages(ctx context.Context, request *replicator.GetReplicationMessagesRequest, opts ...yarpc.CallOption) (*replicator.GetReplicationMessagesResponse, error) { + return t.c.GetReplicationMessages(ctx, request, opts...) +} + +func (t thriftClient) MergeDLQMessages(ctx context.Context, request *replicator.MergeDLQMessagesRequest, opts ...yarpc.CallOption) (*replicator.MergeDLQMessagesResponse, error) { + return t.c.MergeDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) NotifyFailoverMarkers(ctx context.Context, request *history.NotifyFailoverMarkersRequest, opts ...yarpc.CallOption) error { + return t.c.NotifyFailoverMarkers(ctx, request, opts...) +} + +func (t thriftClient) PollMutableState(ctx context.Context, request *history.PollMutableStateRequest, opts ...yarpc.CallOption) (*history.PollMutableStateResponse, error) { + return t.c.PollMutableState(ctx, request, opts...) +} + +func (t thriftClient) PurgeDLQMessages(ctx context.Context, request *replicator.PurgeDLQMessagesRequest, opts ...yarpc.CallOption) error { + return t.c.PurgeDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) QueryWorkflow(ctx context.Context, request *history.QueryWorkflowRequest, opts ...yarpc.CallOption) (*history.QueryWorkflowResponse, error) { + return t.c.QueryWorkflow(ctx, request, opts...) +} + +func (t thriftClient) ReadDLQMessages(ctx context.Context, request *replicator.ReadDLQMessagesRequest, opts ...yarpc.CallOption) (*replicator.ReadDLQMessagesResponse, error) { + return t.c.ReadDLQMessages(ctx, request, opts...) +} + +func (t thriftClient) ReapplyEvents(ctx context.Context, request *history.ReapplyEventsRequest, opts ...yarpc.CallOption) error { + return t.c.ReapplyEvents(ctx, request, opts...) +} + +func (t thriftClient) RecordActivityTaskHeartbeat(ctx context.Context, request *history.RecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) { + return t.c.RecordActivityTaskHeartbeat(ctx, request, opts...) +} + +func (t thriftClient) RecordActivityTaskStarted(ctx context.Context, request *history.RecordActivityTaskStartedRequest, opts ...yarpc.CallOption) (*history.RecordActivityTaskStartedResponse, error) { + return t.c.RecordActivityTaskStarted(ctx, request, opts...) +} + +func (t thriftClient) RecordChildExecutionCompleted(ctx context.Context, request *history.RecordChildExecutionCompletedRequest, opts ...yarpc.CallOption) error { + return t.c.RecordChildExecutionCompleted(ctx, request, opts...) +} + +func (t thriftClient) RecordDecisionTaskStarted(ctx context.Context, request *history.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption) (*history.RecordDecisionTaskStartedResponse, error) { + return t.c.RecordDecisionTaskStarted(ctx, request, opts...) +} + +func (t thriftClient) RefreshWorkflowTasks(ctx context.Context, request *history.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error { + return t.c.RefreshWorkflowTasks(ctx, request, opts...) +} + +func (t thriftClient) RemoveSignalMutableState(ctx context.Context, request *history.RemoveSignalMutableStateRequest, opts ...yarpc.CallOption) error { + return t.c.RemoveSignalMutableState(ctx, request, opts...) +} + +func (t thriftClient) RemoveTask(ctx context.Context, request *shared.RemoveTaskRequest, opts ...yarpc.CallOption) error { + return t.c.RemoveTask(ctx, request, opts...) +} + +func (t thriftClient) ReplicateEventsV2(ctx context.Context, request *history.ReplicateEventsV2Request, opts ...yarpc.CallOption) error { + return t.c.ReplicateEventsV2(ctx, request, opts...) +} + +func (t thriftClient) RequestCancelWorkflowExecution(ctx context.Context, request *history.RequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.RequestCancelWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) ResetQueue(ctx context.Context, request *shared.ResetQueueRequest, opts ...yarpc.CallOption) error { + return t.c.ResetQueue(ctx, request, opts...) +} + +func (t thriftClient) ResetStickyTaskList(ctx context.Context, request *history.ResetStickyTaskListRequest, opts ...yarpc.CallOption) (*history.ResetStickyTaskListResponse, error) { + return t.c.ResetStickyTaskList(ctx, request, opts...) +} + +func (t thriftClient) ResetWorkflowExecution(ctx context.Context, request *history.ResetWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.ResetWorkflowExecutionResponse, error) { + return t.c.ResetWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCanceled(ctx context.Context, request *history.RespondActivityTaskCanceledRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCanceled(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskCompleted(ctx context.Context, request *history.RespondActivityTaskCompletedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskCompleted(ctx, request, opts...) +} + +func (t thriftClient) RespondActivityTaskFailed(ctx context.Context, request *history.RespondActivityTaskFailedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondActivityTaskFailed(ctx, request, opts...) +} + +func (t thriftClient) RespondDecisionTaskCompleted(ctx context.Context, request *history.RespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption) (*history.RespondDecisionTaskCompletedResponse, error) { + return t.c.RespondDecisionTaskCompleted(ctx, request, opts...) +} + +func (t thriftClient) RespondDecisionTaskFailed(ctx context.Context, request *history.RespondDecisionTaskFailedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondDecisionTaskFailed(ctx, request, opts...) +} + +func (t thriftClient) ScheduleDecisionTask(ctx context.Context, request *history.ScheduleDecisionTaskRequest, opts ...yarpc.CallOption) error { + return t.c.ScheduleDecisionTask(ctx, request, opts...) +} + +func (t thriftClient) SignalWithStartWorkflowExecution(ctx context.Context, request *history.SignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + return t.c.SignalWithStartWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) SignalWorkflowExecution(ctx context.Context, request *history.SignalWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.SignalWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) StartWorkflowExecution(ctx context.Context, request *history.StartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.StartWorkflowExecutionResponse, error) { + return t.c.StartWorkflowExecution(ctx, request, opts...) +} + +func (t thriftClient) SyncActivity(ctx context.Context, request *history.SyncActivityRequest, opts ...yarpc.CallOption) error { + return t.c.SyncActivity(ctx, request, opts...) +} + +func (t thriftClient) SyncShardStatus(ctx context.Context, request *history.SyncShardStatusRequest, opts ...yarpc.CallOption) error { + return t.c.SyncShardStatus(ctx, request, opts...) +} + +func (t thriftClient) TerminateWorkflowExecution(ctx context.Context, request *history.TerminateWorkflowExecutionRequest, opts ...yarpc.CallOption) error { + return t.c.TerminateWorkflowExecution(ctx, request, opts...) +} diff --git a/client/matching/client.go b/client/matching/client.go index 36779a760ab..e52d0be2226 100644 --- a/client/matching/client.go +++ b/client/matching/client.go @@ -27,7 +27,6 @@ import ( "go.uber.org/yarpc" m "github.com/uber/cadence/.gen/go/matching" - "github.com/uber/cadence/.gen/go/matching/matchingserviceclient" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/persistence" @@ -224,10 +223,10 @@ func (c *clientImpl) createLongPollContext(parent context.Context) (context.Cont return context.WithTimeout(parent, c.longPollTimeout) } -func (c *clientImpl) getClientForTasklist(key string) (matchingserviceclient.Interface, error) { +func (c *clientImpl) getClientForTasklist(key string) (Client, error) { client, err := c.clients.GetClientForKey(key) if err != nil { return nil, err } - return client.(matchingserviceclient.Interface), nil + return client.(Client), nil } diff --git a/client/matching/interface.go b/client/matching/interface.go index f846c89b76e..47cd2a84266 100644 --- a/client/matching/interface.go +++ b/client/matching/interface.go @@ -21,10 +21,23 @@ package matching import ( - "github.com/uber/cadence/.gen/go/matching/matchingserviceclient" + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/matching" + "github.com/uber/cadence/.gen/go/shared" ) // Client is the interface exposed by matching service client type Client interface { - matchingserviceclient.Interface + AddActivityTask(context.Context, *matching.AddActivityTaskRequest, ...yarpc.CallOption) error + AddDecisionTask(context.Context, *matching.AddDecisionTaskRequest, ...yarpc.CallOption) error + CancelOutstandingPoll(context.Context, *matching.CancelOutstandingPollRequest, ...yarpc.CallOption) error + DescribeTaskList(context.Context, *matching.DescribeTaskListRequest, ...yarpc.CallOption) (*shared.DescribeTaskListResponse, error) + ListTaskListPartitions(context.Context, *matching.ListTaskListPartitionsRequest, ...yarpc.CallOption) (*shared.ListTaskListPartitionsResponse, error) + PollForActivityTask(context.Context, *matching.PollForActivityTaskRequest, ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) + PollForDecisionTask(context.Context, *matching.PollForDecisionTaskRequest, ...yarpc.CallOption) (*matching.PollForDecisionTaskResponse, error) + QueryWorkflow(context.Context, *matching.QueryWorkflowRequest, ...yarpc.CallOption) (*shared.QueryWorkflowResponse, error) + RespondQueryTaskCompleted(context.Context, *matching.RespondQueryTaskCompletedRequest, ...yarpc.CallOption) error } diff --git a/client/matching/thriftClient.go b/client/matching/thriftClient.go new file mode 100644 index 00000000000..92c90cfe60a --- /dev/null +++ b/client/matching/thriftClient.go @@ -0,0 +1,76 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package matching + +import ( + "context" + + "go.uber.org/yarpc" + + "github.com/uber/cadence/.gen/go/matching" + "github.com/uber/cadence/.gen/go/matching/matchingserviceclient" + "github.com/uber/cadence/.gen/go/shared" +) + +type thriftClient struct { + c matchingserviceclient.Interface +} + +// NewThriftClient creates a new instance of Client with thrift protocol +func NewThriftClient(c matchingserviceclient.Interface) Client { + return thriftClient{c} +} + +func (t thriftClient) AddActivityTask(ctx context.Context, request *matching.AddActivityTaskRequest, opts ...yarpc.CallOption) error { + return t.c.AddActivityTask(ctx, request, opts...) +} + +func (t thriftClient) AddDecisionTask(ctx context.Context, request *matching.AddDecisionTaskRequest, opts ...yarpc.CallOption) error { + return t.c.AddDecisionTask(ctx, request, opts...) +} + +func (t thriftClient) CancelOutstandingPoll(ctx context.Context, request *matching.CancelOutstandingPollRequest, opts ...yarpc.CallOption) error { + return t.c.CancelOutstandingPoll(ctx, request, opts...) +} + +func (t thriftClient) DescribeTaskList(ctx context.Context, request *matching.DescribeTaskListRequest, opts ...yarpc.CallOption) (*shared.DescribeTaskListResponse, error) { + return t.c.DescribeTaskList(ctx, request, opts...) +} + +func (t thriftClient) ListTaskListPartitions(ctx context.Context, request *matching.ListTaskListPartitionsRequest, opts ...yarpc.CallOption) (*shared.ListTaskListPartitionsResponse, error) { + return t.c.ListTaskListPartitions(ctx, request, opts...) +} + +func (t thriftClient) PollForActivityTask(ctx context.Context, request *matching.PollForActivityTaskRequest, opts ...yarpc.CallOption) (*shared.PollForActivityTaskResponse, error) { + return t.c.PollForActivityTask(ctx, request, opts...) +} + +func (t thriftClient) PollForDecisionTask(ctx context.Context, request *matching.PollForDecisionTaskRequest, opts ...yarpc.CallOption) (*matching.PollForDecisionTaskResponse, error) { + return t.c.PollForDecisionTask(ctx, request, opts...) +} + +func (t thriftClient) QueryWorkflow(ctx context.Context, request *matching.QueryWorkflowRequest, opts ...yarpc.CallOption) (*shared.QueryWorkflowResponse, error) { + return t.c.QueryWorkflow(ctx, request, opts...) +} + +func (t thriftClient) RespondQueryTaskCompleted(ctx context.Context, request *matching.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error { + return t.c.RespondQueryTaskCompleted(ctx, request, opts...) +}