diff --git a/Makefile b/Makefile index 73e85d80d5e..61d8811d790 100644 --- a/Makefile +++ b/Makefile @@ -283,7 +283,7 @@ integration-test: clean-test-results @! grep -q "^--- FAIL" test.log integration-with-fault-injection-test: clean-test-results - @printf $(COLOR) "Run integration tests..." + @printf $(COLOR) "Run integration tests with fault injection..." $(foreach INTEG_TEST_DIR,$(INTEG_TEST_DIRS),\ @go test $(INTEG_TEST_DIR) -timeout=$(TEST_TIMEOUT) $(TEST_TAG) -race -PersistenceFaultInjectionRate=0.005 | tee -a test.log \ $(NEWLINE)) diff --git a/common/resource/fx.go b/common/resource/fx.go index dcb53407ce0..3909151f535 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -100,7 +100,6 @@ var Module = fx.Options( membership.MonitorLifetimeHooksModule, fx.Provide(ClientFactoryProvider), fx.Provide(ClientBeanProvider), - sdk.Module, fx.Provide(SdkClientFactoryProvider), fx.Provide(FrontedClientProvider), fx.Provide(GrpcListenerProvider), diff --git a/common/sdk/factory.go b/common/sdk/factory.go index 1c38582e09d..76d057f78e6 100644 --- a/common/sdk/factory.go +++ b/common/sdk/factory.go @@ -29,23 +29,28 @@ package sdk import ( "crypto/tls" "fmt" + "sync" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/common" + "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" ) type ( ClientFactory interface { NewClient(namespaceName string, logger log.Logger) (sdkclient.Client, error) - NewSystemClient(logger log.Logger) (sdkclient.Client, error) + GetSystemClient(logger log.Logger) sdkclient.Client } clientFactory struct { - hostPort string - tlsConfig *tls.Config - metricsHandler *MetricsHandler + hostPort string + tlsConfig *tls.Config + metricsHandler *MetricsHandler + systemSdkClient sdkclient.Client + once *sync.Once } ) @@ -56,27 +61,47 @@ func NewClientFactory(hostPort string, tlsConfig *tls.Config, metricsHandler *Me hostPort: hostPort, tlsConfig: tlsConfig, metricsHandler: metricsHandler, + once: &sync.Once{}, } } func (f *clientFactory) NewClient(namespaceName string, logger log.Logger) (sdkclient.Client, error) { - sdkClient, err := sdkclient.NewClient(sdkclient.Options{ - HostPort: f.hostPort, - Namespace: namespaceName, - MetricsHandler: f.metricsHandler, - Logger: log.NewSdkLogger(logger), - ConnectionOptions: sdkclient.ConnectionOptions{ - TLS: f.tlsConfig, - DisableHealthCheck: true, - }, - }) - if err != nil { - return nil, fmt.Errorf("unable to create SDK client: %w", err) - } + var client sdkclient.Client + + // Retry for up to 1m, handles frontend service not ready + err := backoff.Retry(func() error { + sdkClient, err := sdkclient.NewClient(sdkclient.Options{ + HostPort: f.hostPort, + Namespace: namespaceName, + MetricsHandler: f.metricsHandler, + Logger: log.NewSdkLogger(logger), + ConnectionOptions: sdkclient.ConnectionOptions{ + TLS: f.tlsConfig, + }, + }) + if err != nil { + return fmt.Errorf("unable to create SDK client: %w", err) + } + + client = sdkClient + return nil + }, common.CreateSdkClientFactoryRetryPolicy(), common.IsContextDeadlineExceededErr) - return sdkClient, nil + return client, err } -func (f *clientFactory) NewSystemClient(logger log.Logger) (sdkclient.Client, error) { - return f.NewClient(common.SystemLocalNamespace, logger) +func (f *clientFactory) GetSystemClient(logger log.Logger) sdkclient.Client { + f.once.Do(func() { + client, err := f.NewClient(common.SystemLocalNamespace, logger) + + if err != nil { + logger.Fatal( + "error getting system sdk client", + tag.Error(err), + ) + } + + f.systemSdkClient = client + }) + return f.systemSdkClient } diff --git a/common/sdk/factory_mock.go b/common/sdk/factory_mock.go index f5d7e6d50b4..5bd79518da6 100644 --- a/common/sdk/factory_mock.go +++ b/common/sdk/factory_mock.go @@ -59,32 +59,31 @@ func (m *MockClientFactory) EXPECT() *MockClientFactoryMockRecorder { return m.recorder } -// NewClient mocks base method. -func (m *MockClientFactory) NewClient(namespaceName string, logger log.Logger) (client.Client, error) { +// GetSystemClient mocks base method. +func (m *MockClientFactory) GetSystemClient(logger log.Logger) client.Client { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewClient", namespaceName, logger) + ret := m.ctrl.Call(m, "GetSystemClient", logger) ret0, _ := ret[0].(client.Client) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } -// NewClient indicates an expected call of NewClient. -func (mr *MockClientFactoryMockRecorder) NewClient(namespaceName, logger interface{}) *gomock.Call { +// GetSystemClient indicates an expected call of GetSystemClient. +func (mr *MockClientFactoryMockRecorder) GetSystemClient(logger interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClient", reflect.TypeOf((*MockClientFactory)(nil).NewClient), namespaceName, logger) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSystemClient", reflect.TypeOf((*MockClientFactory)(nil).GetSystemClient), logger) } -// NewSystemClient mocks base method. -func (m *MockClientFactory) NewSystemClient(logger log.Logger) (client.Client, error) { +// NewClient mocks base method. +func (m *MockClientFactory) NewClient(namespaceName string, logger log.Logger) (client.Client, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewSystemClient", logger) + ret := m.ctrl.Call(m, "NewClient", namespaceName, logger) ret0, _ := ret[0].(client.Client) ret1, _ := ret[1].(error) return ret0, ret1 } -// NewSystemClient indicates an expected call of NewSystemClient. -func (mr *MockClientFactoryMockRecorder) NewSystemClient(logger interface{}) *gomock.Call { +// NewClient indicates an expected call of NewClient. +func (mr *MockClientFactoryMockRecorder) NewClient(namespaceName, logger interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewSystemClient", reflect.TypeOf((*MockClientFactory)(nil).NewSystemClient), logger) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewClient", reflect.TypeOf((*MockClientFactory)(nil).NewClient), namespaceName, logger) } diff --git a/common/sdk/fx.go b/common/sdk/fx.go deleted file mode 100644 index 9b35cc40028..00000000000 --- a/common/sdk/fx.go +++ /dev/null @@ -1,40 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package sdk - -import ( - sdkclient "go.temporal.io/sdk/client" - "go.uber.org/fx" - - "go.temporal.io/server/common/log" -) - -var Module = fx.Options( - fx.Provide(sdkSystemClientProvider), -) - -func sdkSystemClientProvider(sdkClientFactory ClientFactory, logger log.Logger) (sdkclient.Client, error) { - return sdkClientFactory.NewSystemClient(logger) -} diff --git a/common/util.go b/common/util.go index 21bc461ed41..8f02a49ad01 100644 --- a/common/util.go +++ b/common/util.go @@ -84,6 +84,8 @@ const ( replicationServiceBusyMaxInterval = 10 * time.Second replicationServiceBusyExpirationInterval = 30 * time.Second + sdkClientFactoryRetryExpirationInterval = time.Minute + defaultInitialInterval = time.Second defaultMaximumIntervalCoefficient = 100.0 defaultBackoffCoefficient = 2.0 @@ -202,6 +204,15 @@ func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy { return policy } +// CreateSdkClientFactoryRetryPolicy creates a retry policy to handle SdkClientFactory NewClient when frontend service is not ready +func CreateSdkClientFactoryRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval) + policy.SetMaximumInterval(frontendServiceOperationMaxInterval) + policy.SetExpirationInterval(sdkClientFactoryRetryExpirationInterval) + + return policy +} + // IsPersistenceTransientError checks if the error is a transient persistence error func IsPersistenceTransientError(err error) bool { switch err.(type) { diff --git a/host/client_integration_test.go b/host/client_integration_test.go index d76f92decc1..fb983147ee3 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -97,9 +97,6 @@ func (s *clientIntegrationSuite) SetupTest() { sdkClient, err := sdkclient.NewClient(sdkclient.Options{ HostPort: s.hostPort, Namespace: s.namespace, - ConnectionOptions: sdkclient.ConnectionOptions{ - DisableHealthCheck: true, - }, }) if err != nil { s.Logger.Fatal("Error when creating SDK client", tag.Error(err)) @@ -249,9 +246,6 @@ func (s *clientIntegrationSuite) startWorkerWithDataConverter(tl string, dataCon HostPort: s.hostPort, Namespace: s.namespace, DataConverter: dataConverter, - ConnectionOptions: sdkclient.ConnectionOptions{ - DisableHealthCheck: true, - }, }) if err != nil { s.Logger.Fatal("Error when creating SDK client", tag.Error(err)) diff --git a/service/frontend/adminHandler.go b/service/frontend/adminHandler.go index cbf4179ff7d..2cba91c2002 100644 --- a/service/frontend/adminHandler.go +++ b/service/frontend/adminHandler.go @@ -71,6 +71,7 @@ import ( "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/rpc/interceptor" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/tasks" @@ -106,7 +107,7 @@ type ( clientFactory serverClient.Factory clientBean serverClient.Bean historyClient historyservice.HistoryServiceClient - sdkClient sdkclient.Client + sdkClientFactory sdk.ClientFactory membershipMonitor membership.Monitor metricsClient metrics.Client namespaceRegistry namespace.Registry @@ -132,7 +133,7 @@ type ( ClientFactory serverClient.Factory ClientBean serverClient.Bean HistoryClient historyservice.HistoryServiceClient - SdkSystemClient sdkclient.Client + sdkClientFactory sdk.ClientFactory MembershipMonitor membership.Monitor ArchiverProvider provider.ArchiverProvider MetricsClient metrics.Client @@ -193,7 +194,7 @@ func NewAdminHandler( clientFactory: args.ClientFactory, clientBean: args.ClientBean, historyClient: args.HistoryClient, - sdkClient: args.SdkSystemClient, + sdkClientFactory: args.sdkClientFactory, membershipMonitor: args.MembershipMonitor, metricsClient: args.MetricsClient, namespaceRegistry: args.NamespaceRegistry, @@ -278,7 +279,8 @@ func (adh *AdminHandler) AddSearchAttributes(ctx context.Context, request *admin SkipSchemaUpdate: request.GetSkipSchemaUpdate(), } - run, err := adh.sdkClient.ExecuteWorkflow( + sdkClient := adh.sdkClientFactory.GetSystemClient(adh.logger) + run, err := sdkClient.ExecuteWorkflow( ctx, sdkclient.StartWorkflowOptions{ TaskQueue: worker.DefaultWorkerTaskQueue, @@ -376,7 +378,9 @@ func (adh *AdminHandler) GetSearchAttributes(ctx context.Context, request *admin func (adh *AdminHandler) getSearchAttributes(ctx context.Context, indexName string, runID string) (*adminservice.GetSearchAttributesResponse, error) { var lastErr error - descResp, err := adh.sdkClient.DescribeWorkflowExecution(ctx, addsearchattributes.WorkflowName, runID) + + sdkClient := adh.sdkClientFactory.GetSystemClient(adh.logger) + descResp, err := sdkClient.DescribeWorkflowExecution(ctx, addsearchattributes.WorkflowName, runID) var wfInfo *workflowpb.WorkflowExecutionInfo if err != nil { // NotFound can happen when no search attributes were added and the workflow has never been executed. diff --git a/service/frontend/adminHandler_test.go b/service/frontend/adminHandler_test.go index 7ab1189bd5b..5d8b77876f1 100644 --- a/service/frontend/adminHandler_test.go +++ b/service/frontend/adminHandler_test.go @@ -70,11 +70,10 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockResource *resource.Test - mockHistoryClient *historyservicemock.MockHistoryServiceClient - mockSdkSystemClient *sdkmocks.Client - mockNamespaceCache *namespace.MockRegistry + controller *gomock.Controller + mockResource *resource.Test + mockHistoryClient *historyservicemock.MockHistoryServiceClient + mockNamespaceCache *namespace.MockRegistry mockExecutionMgr *persistence.MockExecutionManager mockVisibilityMgr *manager.MockVisibilityManager @@ -114,8 +113,6 @@ func (s *adminHandlerSuite) SetupTest() { s.mockVisibilityMgr = manager.NewMockVisibilityManager(s.controller) s.mockProducer = persistence.NewMockNamespaceReplicationQueue(s.controller) - s.mockSdkSystemClient = &sdkmocks.Client{} - params := &resource.BootstrapParams{ PersistenceConfig: config.Persistence{ NumHistoryShards: 1, @@ -138,7 +135,7 @@ func (s *adminHandlerSuite) SetupTest() { s.mockResource.GetClientFactory(), s.mockResource.GetClientBean(), s.mockResource.GetHistoryClient(), - s.mockSdkSystemClient, + s.mockResource.GetSDKClientFactory(), s.mockResource.GetMembershipMonitor(), s.mockResource.GetArchiverProvider(), s.mockResource.GetMetricsClient(), @@ -538,8 +535,11 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { }) } + mockSdkClient := &sdkmocks.Client{} + s.mockResource.SDKClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(mockSdkClient).AnyTimes() + // Start workflow failed. - s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(nil, errors.New("start failed")).Once() + mockSdkClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(nil, errors.New("start failed")).Once() resp, err := handler.AddSearchAttributes(ctx, &adminservice.AddSearchAttributesRequest{ SearchAttributes: map[string]enumspb.IndexedValueType{ "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, @@ -552,7 +552,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { // Workflow failed. mockRun := &sdkmocks.WorkflowRun{} mockRun.On("Get", mock.Anything, nil).Return(errors.New("workflow failed")).Once() - s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(mockRun, nil) + mockSdkClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(mockRun, nil) resp, err = handler.AddSearchAttributes(ctx, &adminservice.AddSearchAttributesRequest{ SearchAttributes: map[string]enumspb.IndexedValueType{ "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, @@ -573,7 +573,7 @@ func (s *adminHandlerSuite) Test_AddSearchAttributes() { s.NoError(err) s.NotNil(resp) mockRun.AssertExpectations(s.T()) - s.mockSdkSystemClient.AssertExpectations(s.T()) + mockSdkClient.AssertExpectations(s.T()) } func (s *adminHandlerSuite) Test_GetSearchAttributes() { @@ -585,8 +585,11 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { s.Equal(&serviceerror.InvalidArgument{Message: "Request is nil."}, err) s.Nil(resp) + mockSdkClient := &sdkmocks.Client{} + s.mockResource.SDKClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(mockSdkClient).AnyTimes() + // Elasticsearch is not configured - s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( + mockSdkClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil).Once() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() @@ -602,7 +605,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { }, } - s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( + mockSdkClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil).Once() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() @@ -610,7 +613,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { s.NoError(err) s.NotNil(resp) - s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( + mockSdkClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( &workflowservice.DescribeWorkflowExecutionResponse{}, nil).Once() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "another-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("another-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() @@ -618,7 +621,7 @@ func (s *adminHandlerSuite) Test_GetSearchAttributes() { s.NoError(err) s.NotNil(resp) - s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( + mockSdkClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( nil, errors.New("random error")).Once() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "random-index-name").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("random-index-name", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() diff --git a/service/frontend/fx.go b/service/frontend/fx.go index f206431cdb4..5eada1a07dc 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -28,7 +28,6 @@ import ( "context" "net" - sdkclient "go.temporal.io/sdk/client" "go.uber.org/fx" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -60,6 +59,7 @@ import ( "go.temporal.io/server/common/resource" "go.temporal.io/server/common/rpc" "go.temporal.io/server/common/rpc/interceptor" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/frontend/configs" @@ -379,7 +379,7 @@ func AdminHandlerProvider( clientFactory client.Factory, clientBean client.Bean, historyClient historyservice.HistoryServiceClient, - sdkSystemClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, membershipMonitor membership.Monitor, archiverProvider provider.ArchiverProvider, metricsClient metrics.Client, @@ -406,7 +406,7 @@ func AdminHandlerProvider( clientFactory, clientBean, historyClient, - sdkSystemClient, + sdkClientFactory, membershipMonitor, archiverProvider, metricsClient, @@ -424,7 +424,7 @@ func OperatorHandlerProvider( esConfig *esclient.Config, esClient esclient.Client, logger resource.SnTaggedLogger, - sdkSystemClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, metricsClient metrics.Client, saProvider searchattribute.Provider, saManager searchattribute.Manager, @@ -434,7 +434,7 @@ func OperatorHandlerProvider( esConfig, esClient, logger, - sdkSystemClient, + sdkClientFactory, metricsClient, saProvider, saManager, diff --git a/service/frontend/operator_handler.go b/service/frontend/operator_handler.go index 5cf27b1b371..95f5c4869f4 100644 --- a/service/frontend/operator_handler.go +++ b/service/frontend/operator_handler.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" esclient "go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/worker" "go.temporal.io/server/service/worker/addsearchattributes" @@ -55,26 +56,26 @@ type ( OperatorHandlerImpl struct { status int32 - healthStatus int32 - logger log.Logger - esConfig *esclient.Config - esClient esclient.Client - sdkClient sdkclient.Client - metricsClient metrics.Client - saProvider searchattribute.Provider - saManager searchattribute.Manager - healthServer *health.Server + healthStatus int32 + logger log.Logger + esConfig *esclient.Config + esClient esclient.Client + sdkClientFactory sdk.ClientFactory + metricsClient metrics.Client + saProvider searchattribute.Provider + saManager searchattribute.Manager + healthServer *health.Server } NewOperatorHandlerImplArgs struct { - EsConfig *esclient.Config - EsClient esclient.Client - Logger log.Logger - SdkSystemClient sdkclient.Client - MetricsClient metrics.Client - SaProvider searchattribute.Provider - SaManager searchattribute.Manager - healthServer *health.Server + EsConfig *esclient.Config + EsClient esclient.Client + Logger log.Logger + sdkClientFactory sdk.ClientFactory + MetricsClient metrics.Client + SaProvider searchattribute.Provider + SaManager searchattribute.Manager + healthServer *health.Server } ) @@ -84,15 +85,15 @@ func NewOperatorHandlerImpl( ) *OperatorHandlerImpl { handler := &OperatorHandlerImpl{ - logger: args.Logger, - status: common.DaemonStatusInitialized, - esConfig: args.EsConfig, - esClient: args.EsClient, - sdkClient: args.SdkSystemClient, - metricsClient: args.MetricsClient, - saProvider: args.SaProvider, - saManager: args.SaManager, - healthServer: args.healthServer, + logger: args.Logger, + status: common.DaemonStatusInitialized, + esConfig: args.EsConfig, + esClient: args.EsClient, + sdkClientFactory: args.sdkClientFactory, + metricsClient: args.MetricsClient, + saProvider: args.SaProvider, + saManager: args.SaManager, + healthServer: args.healthServer, } return handler @@ -163,7 +164,8 @@ func (h *OperatorHandlerImpl) AddSearchAttributes(ctx context.Context, request * SkipSchemaUpdate: false, } - run, err := h.sdkClient.ExecuteWorkflow( + sdkClient := h.sdkClientFactory.GetSystemClient(h.logger) + run, err := sdkClient.ExecuteWorkflow( ctx, sdkclient.StartWorkflowOptions{ TaskQueue: worker.DefaultWorkerTaskQueue, diff --git a/service/frontend/operator_handler_test.go b/service/frontend/operator_handler_test.go index c40eadd22f2..6102fc39bbd 100644 --- a/service/frontend/operator_handler_test.go +++ b/service/frontend/operator_handler_test.go @@ -39,7 +39,6 @@ import ( "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" - "go.temporal.io/api/workflowservice/v1" sdkmocks "go.temporal.io/sdk/mocks" "go.temporal.io/server/common/metrics" @@ -53,9 +52,8 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockResource *resource.Test - mockSdkSystemClient *sdkmocks.Client + controller *gomock.Controller + mockResource *resource.Test handler *OperatorHandlerImpl } @@ -72,13 +70,11 @@ func (s *operatorHandlerSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockResource = resource.NewTest(s.controller, metrics.Frontend) - s.mockSdkSystemClient = &sdkmocks.Client{} - args := NewOperatorHandlerImplArgs{ nil, s.mockResource.ESClient, s.mockResource.Logger, - s.mockSdkSystemClient, + s.mockResource.GetSDKClientFactory(), s.mockResource.GetMetricsClient(), s.mockResource.GetSearchAttributesProvider(), s.mockResource.GetSearchAttributesManager(), @@ -189,8 +185,11 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { }) } + mockSdkClient := &sdkmocks.Client{} + s.mockResource.SDKClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(mockSdkClient).AnyTimes() + // Start workflow failed. - s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(nil, errors.New("start failed")).Once() + mockSdkClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(nil, errors.New("start failed")).Once() resp, err := handler.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ SearchAttributes: map[string]enumspb.IndexedValueType{ "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, @@ -205,7 +204,7 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { mockRun.On("Get", mock.Anything, nil).Return(errors.New("workflow failed")).Once() const RunId = "31d8ebd6-93a7-11ec-b909-0242ac120002" mockRun.On("GetRunID").Return(RunId).Once() - s.mockSdkSystemClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(mockRun, nil) + mockSdkClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, "temporal-sys-add-search-attributes-workflow", mock.Anything).Return(mockRun, nil) resp, err = handler.AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ SearchAttributes: map[string]enumspb.IndexedValueType{ "CustomAttr": enumspb.INDEXED_VALUE_TYPE_KEYWORD, @@ -227,7 +226,7 @@ func (s *operatorHandlerSuite) Test_AddSearchAttributes() { s.NoError(err) s.NotNil(resp) mockRun.AssertExpectations(s.T()) - s.mockSdkSystemClient.AssertExpectations(s.T()) + mockSdkClient.AssertExpectations(s.T()) } func (s *operatorHandlerSuite) Test_ListSearchAttributes() { @@ -240,8 +239,6 @@ func (s *operatorHandlerSuite) Test_ListSearchAttributes() { s.Nil(resp) // Elasticsearch is not configured - s.mockSdkSystemClient.On("DescribeWorkflowExecution", mock.Anything, "temporal-sys-add-search-attributes-workflow", "").Return( - &workflowservice.DescribeWorkflowExecutionResponse{}, nil).Once() s.mockResource.ESClient.EXPECT().GetMapping(gomock.Any(), "").Return(map[string]string{"col": "type"}, nil) s.mockResource.SearchAttributesProvider.EXPECT().GetSearchAttributes("", true).Return(searchattribute.TestNameTypeMap, nil).AnyTimes() diff --git a/service/history/fx.go b/service/history/fx.go index 7b399ad7a05..c68391bb9c1 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -28,7 +28,6 @@ import ( "context" "net" - sdkclient "go.temporal.io/sdk/client" "go.uber.org/fx" "google.golang.org/grpc" @@ -54,6 +53,7 @@ import ( "go.temporal.io/server/common/resolver" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/rpc/interceptor" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/service" "go.temporal.io/server/service/history/configs" @@ -287,7 +287,7 @@ func ReplicationTaskFetchersProvider( func ArchivalClientProvider( archiverProvider provider.ArchiverProvider, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, logger log.Logger, metricsClient metrics.Client, config *configs.Config, @@ -295,7 +295,7 @@ func ArchivalClientProvider( return warchiver.NewClient( metricsClient, logger, - publicClient, + sdkClientFactory, config.NumArchiveSystemWorkflows, config.ArchiveRequestRPS, archiverProvider, diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index baa205991a5..3840762441f 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -41,9 +41,9 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/tasks" @@ -110,7 +110,7 @@ type ( replicationTaskFetchers ReplicationTaskFetchers replicationTaskProcessorsLock sync.Mutex replicationTaskProcessors map[string]ReplicationTaskProcessor - publicClient sdkclient.Client + sdkClientFactory sdk.ClientFactory eventsReapplier nDCEventsReapplier matchingClient matchingservice.MatchingServiceClient rawMatchingClient matchingservice.MatchingServiceClient @@ -126,7 +126,7 @@ func NewEngineWithShardContext( visibilityMgr manager.VisibilityManager, matchingClient matchingservice.MatchingServiceClient, historyClient historyservice.HistoryServiceClient, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, eventNotifier events.Notifier, config *configs.Config, replicationTaskFetchers ReplicationTaskFetchers, @@ -163,7 +163,7 @@ func NewEngineWithShardContext( metricsClient: shard.GetMetricsClient(), eventNotifier: eventNotifier, config: config, - publicClient: publicClient, + sdkClientFactory: sdkClientFactory, matchingClient: matchingClient, rawMatchingClient: rawMatchingClient, replicationTaskProcessors: make(map[string]ReplicationTaskProcessor), diff --git a/service/history/historyEngineFactory.go b/service/history/historyEngineFactory.go index 5eeee6f146c..88772d5b110 100644 --- a/service/history/historyEngineFactory.go +++ b/service/history/historyEngineFactory.go @@ -25,10 +25,10 @@ package history import ( - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/queues" @@ -45,7 +45,7 @@ type ( VisibilityMgr manager.VisibilityManager MatchingClient resource.MatchingClient HistoryClient historyservice.HistoryServiceClient - PublicClient sdkclient.Client + SdkClientFactory sdk.ClientFactory EventNotifier events.Notifier Config *configs.Config ReplicationTaskFetchers ReplicationTaskFetchers @@ -68,7 +68,7 @@ func (f *historyEngineFactory) CreateEngine( f.VisibilityMgr, f.MatchingClient, f.HistoryClient, - f.PublicClient, + f.SdkClientFactory, f.EventNotifier, f.Config, f.ReplicationTaskFetchers, diff --git a/service/history/queueProcessorFactory.go b/service/history/queueProcessorFactory.go index 105e94a1688..87e9ab91389 100644 --- a/service/history/queueProcessorFactory.go +++ b/service/history/queueProcessorFactory.go @@ -25,12 +25,12 @@ package history import ( - sdkclient "go.temporal.io/sdk/client" "go.uber.org/fx" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/resource" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" @@ -58,10 +58,10 @@ type ( transferQueueProcessorFactoryParams struct { fx.In - ArchivalClient archiver.Client - PublicClient sdkclient.Client - MatchingClient resource.MatchingClient - HistoryClient historyservice.HistoryServiceClient + ArchivalClient archiver.Client + SdkClientFactory sdk.ClientFactory + MatchingClient resource.MatchingClient + HistoryClient historyservice.HistoryServiceClient } timerQueueProcessorFactoryParams struct { @@ -108,7 +108,7 @@ func (f *transferQueueProcessorFactory) CreateProcessor( engine, workflowCache, f.ArchivalClient, - f.PublicClient, + f.SdkClientFactory, f.MatchingClient, f.HistoryClient, ) diff --git a/service/history/transferQueueActiveProcessor.go b/service/history/transferQueueActiveProcessor.go index bc83a95d9ce..cc533b5081c 100644 --- a/service/history/transferQueueActiveProcessor.go +++ b/service/history/transferQueueActiveProcessor.go @@ -28,7 +28,6 @@ import ( "context" "github.com/pborman/uuid" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -37,6 +36,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -62,7 +62,7 @@ func newTransferQueueActiveProcessor( shard shard.Context, workflowCache workflow.Cache, archivalClient archiver.Client, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, matchingClient matchingservice.MatchingServiceClient, historyClient historyservice.HistoryServiceClient, taskAllocator taskAllocator, @@ -115,7 +115,7 @@ func newTransferQueueActiveProcessor( shard, workflowCache, archivalClient, - publicClient, + sdkClientFactory, logger, config, matchingClient, @@ -158,7 +158,7 @@ func newTransferQueueFailoverProcessor( shard shard.Context, workflowCache workflow.Cache, archivalClient archiver.Client, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, matchingClient matchingservice.MatchingServiceClient, historyClient historyservice.HistoryServiceClient, namespaceIDs map[string]struct{}, @@ -228,7 +228,7 @@ func newTransferQueueFailoverProcessor( shard, workflowCache, archivalClient, - publicClient, + sdkClientFactory, logger, config, matchingClient, diff --git a/service/history/transferQueueActiveTaskExecutor.go b/service/history/transferQueueActiveTaskExecutor.go index 19cdd37ad08..d13ac4f9942 100644 --- a/service/history/transferQueueActiveTaskExecutor.go +++ b/service/history/transferQueueActiveTaskExecutor.go @@ -38,7 +38,6 @@ import ( taskqueuepb "go.temporal.io/api/taskqueue/v1" workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -53,6 +52,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/shard" @@ -77,7 +77,7 @@ func newTransferQueueActiveTaskExecutor( shard shard.Context, workflowCache workflow.Cache, archivalClient archiver.Client, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, logger log.Logger, config *configs.Config, matchingClient matchingservice.MatchingServiceClient, @@ -99,7 +99,7 @@ func newTransferQueueActiveTaskExecutor( parentClosePolicyClient: parentclosepolicy.NewClient( shard.GetMetricsClient(), shard.GetLogger(), - publicClient, + sdkClientFactory, config.NumParentClosePolicySystemWorkflows(), ), registry: shard.GetNamespaceRegistry(), diff --git a/service/history/transferQueueActiveTaskExecutor_test.go b/service/history/transferQueueActiveTaskExecutor_test.go index c1a01921c8b..d4ca16abd3d 100644 --- a/service/history/transferQueueActiveTaskExecutor_test.go +++ b/service/history/transferQueueActiveTaskExecutor_test.go @@ -226,7 +226,7 @@ func (s *transferQueueActiveTaskExecutorSuite) SetupTest() { s.mockShard, h.historyCache, s.mockArchivalClient, - h.publicClient, + h.sdkClientFactory, s.logger, config, s.mockShard.Resource.MatchingClient, diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index c901bdb2a7c..b9b13e311fc 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -33,8 +33,6 @@ import ( "sync/atomic" "time" - sdkclient "go.temporal.io/sdk/client" - "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" @@ -44,6 +42,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/queues" @@ -68,7 +67,7 @@ type ( historyEngine shard.Engine workflowCache workflow.Cache archivalClient archiver.Client - publicClient sdkclient.Client + sdkClientFactory sdk.ClientFactory taskAllocator taskAllocator config *configs.Config metricsClient metrics.Client @@ -90,7 +89,7 @@ func newTransferQueueProcessor( historyEngine shard.Engine, workflowCache workflow.Cache, archivalClient archiver.Client, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, matchingClient matchingservice.MatchingServiceClient, historyClient historyservice.HistoryServiceClient, ) queues.Processor { @@ -107,7 +106,7 @@ func newTransferQueueProcessor( historyEngine: historyEngine, workflowCache: workflowCache, archivalClient: archivalClient, - publicClient: publicClient, + sdkClientFactory: sdkClientFactory, taskAllocator: taskAllocator, config: config, metricsClient: shard.GetMetricsClient(), @@ -120,7 +119,7 @@ func newTransferQueueProcessor( shard, workflowCache, archivalClient, - publicClient, + sdkClientFactory, matchingClient, historyClient, taskAllocator, @@ -211,7 +210,7 @@ func (t *transferQueueProcessorImpl) FailoverNamespace( t.shard, t.workflowCache, t.archivalClient, - t.publicClient, + t.sdkClientFactory, t.matchingClient, t.historyClient, namespaceIDs, diff --git a/service/worker/archiver/client.go b/service/worker/archiver/client.go index 3744e93dbef..a95401e3956 100644 --- a/service/worker/archiver/client.go +++ b/service/worker/archiver/client.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/common/quotas" + "go.temporal.io/server/common/sdk" "go.temporal.io/server/common/searchattribute" ) @@ -99,7 +100,7 @@ type ( client struct { metricsScope metrics.Scope logger log.Logger - temporalClient sdkclient.Client + sdkClientFactory sdk.ClientFactory numWorkflows dynamicconfig.IntPropertyFn rateLimiter quotas.RateLimiter archiverProvider provider.ArchiverProvider @@ -126,16 +127,16 @@ const ( func NewClient( metricsClient metrics.Client, logger log.Logger, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, numWorkflows dynamicconfig.IntPropertyFn, requestRPS dynamicconfig.IntPropertyFn, archiverProvider provider.ArchiverProvider, ) Client { return &client{ - metricsScope: metricsClient.Scope(metrics.ArchiverClientScope), - logger: logger, - temporalClient: publicClient, - numWorkflows: numWorkflows, + metricsScope: metricsClient.Scope(metrics.ArchiverClientScope), + logger: logger, + sdkClientFactory: sdkClientFactory, + numWorkflows: numWorkflows, rateLimiter: quotas.NewDefaultOutgoingRateLimiter( func() float64 { return float64(requestRPS()) }, ), @@ -294,7 +295,9 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, } signalCtx, cancel := context.WithTimeout(context.Background(), signalTimeout) defer cancel() - _, err := c.temporalClient.SignalWithStartWorkflow(signalCtx, workflowID, signalName, *request, workflowOptions, archivalWorkflowFnName, nil) + + sdkClient := c.sdkClientFactory.GetSystemClient(c.logger) + _, err := sdkClient.SignalWithStartWorkflow(signalCtx, workflowID, signalName, *request, workflowOptions, archivalWorkflowFnName, nil) if err != nil { taggedLogger.Error("failed to send signal to archival system workflow", tag.ArchivalRequestNamespaceID(request.NamespaceID), diff --git a/service/worker/archiver/client_test.go b/service/worker/archiver/client_test.go index d8c77872920..c37fadfba77 100644 --- a/service/worker/archiver/client_test.go +++ b/service/worker/archiver/client_test.go @@ -33,13 +33,14 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/mocks" + "go.temporal.io/sdk/mocks" carchiver "go.temporal.io/server/common/archiver" "go.temporal.io/server/common/archiver/provider" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/sdk" ) type clientSuite struct { @@ -53,6 +54,7 @@ type clientSuite struct { visibilityArchiver *carchiver.MockVisibilityArchiver metricsClient *metrics.MockClient metricsScope *metrics.MockScope + sdkClientFactory *sdk.MockClientFactory sdkClient *mocks.Client client *client } @@ -70,17 +72,18 @@ func (s *clientSuite) SetupTest() { s.visibilityArchiver = carchiver.NewMockVisibilityArchiver(s.controller) s.metricsClient = metrics.NewMockClient(s.controller) s.metricsScope = metrics.NewMockScope(s.controller) - s.sdkClient = &mocks.Client{} s.metricsClient.EXPECT().Scope(metrics.ArchiverClientScope, gomock.Any()).Return(s.metricsScope) + s.sdkClient = &mocks.Client{} + s.sdkClientFactory = sdk.NewMockClientFactory(s.controller) + s.sdkClientFactory.EXPECT().GetSystemClient(gomock.Any()).Return(s.sdkClient).AnyTimes() s.client = NewClient( s.metricsClient, log.NewNoopLogger(), - nil, + s.sdkClientFactory, dynamicconfig.GetIntPropertyFn(1000), dynamicconfig.GetIntPropertyFn(1000), s.archiverProvider, ).(*client) - s.client.temporalClient = s.sdkClient } func (s *clientSuite) TearDownTest() { diff --git a/service/worker/archiver/client_worker.go b/service/worker/archiver/client_worker.go index b31644cc81e..e250672ba08 100644 --- a/service/worker/archiver/client_worker.go +++ b/service/worker/archiver/client_worker.go @@ -29,7 +29,6 @@ import ( "time" "go.temporal.io/sdk/activity" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" @@ -41,6 +40,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" + "go.temporal.io/server/common/sdk" ) type ( @@ -57,7 +57,7 @@ type ( // BootstrapContainer contains everything need for bootstrapping BootstrapContainer struct { - SdkSystemClient sdkclient.Client + SdkClientFactory sdk.ClientFactory MetricsClient metrics.Client Logger log.Logger HistoryV2Manager persistence.ExecutionManager @@ -105,6 +105,8 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker { globalMetricsClient = container.MetricsClient globalConfig = container.Config actCtx := context.WithValue(context.Background(), bootstrapContainerKey, container) + + sdkClient := container.SdkClientFactory.GetSystemClient(container.Logger) wo := worker.Options{ MaxConcurrentActivityExecutionSize: container.Config.MaxConcurrentActivityExecutionSize(), MaxConcurrentWorkflowTaskExecutionSize: container.Config.MaxConcurrentWorkflowTaskExecutionSize(), @@ -113,7 +115,7 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker { BackgroundActivityContext: actCtx, } clientWorker := &clientWorker{ - worker: worker.New(container.SdkSystemClient, workflowTaskQueue, wo), + worker: worker.New(sdkClient, workflowTaskQueue, wo), namespaceRegistry: container.NamespaceCache, } diff --git a/service/worker/batcher/batcher.go b/service/worker/batcher/batcher.go index 5fdf30ca782..acb9a9edb72 100644 --- a/service/worker/batcher/batcher.go +++ b/service/worker/batcher/batcher.go @@ -28,7 +28,6 @@ import ( "context" "go.temporal.io/sdk/activity" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" @@ -52,7 +51,6 @@ type ( // It is also the context object that get's passed around within the scanner workflows / activities Batcher struct { cfg *Config - sdkSystemClient sdkclient.Client sdkClientFactory sdk.ClientFactory metricsClient metrics.Client logger log.Logger @@ -62,14 +60,12 @@ type ( // New returns a new instance of batcher daemon Batcher func New( cfg *Config, - sdkSystemClient sdkclient.Client, metricsClient metrics.Client, logger log.Logger, sdkClientFactory sdk.ClientFactory, ) *Batcher { return &Batcher{ cfg: cfg, - sdkSystemClient: sdkSystemClient, sdkClientFactory: sdkClientFactory, metricsClient: metricsClient, logger: log.With(logger, tag.ComponentBatcher), @@ -80,6 +76,7 @@ func New( func (s *Batcher) Start() error { // start worker for batch operation workflows ctx := context.WithValue(context.Background(), batcherContextKey, s) + workerOpts := worker.Options{ MaxConcurrentActivityExecutionSize: s.cfg.MaxConcurrentActivityExecutionSize(), MaxConcurrentWorkflowTaskExecutionSize: s.cfg.MaxConcurrentWorkflowTaskExecutionSize(), @@ -88,7 +85,9 @@ func (s *Batcher) Start() error { BackgroundActivityContext: ctx, } - batchWorker := worker.New(s.sdkSystemClient, BatcherTaskQueueName, workerOpts) + + sdkClient := s.sdkClientFactory.GetSystemClient(s.logger) + batchWorker := worker.New(sdkClient, BatcherTaskQueueName, workerOpts) batchWorker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName}) batchWorker.RegisterActivityWithOptions(BatchActivity, activity.RegisterOptions{Name: batchActivityName}) diff --git a/service/worker/parentclosepolicy/client.go b/service/worker/parentclosepolicy/client.go index 7f89ec777f9..5f03dd044ae 100644 --- a/service/worker/parentclosepolicy/client.go +++ b/service/worker/parentclosepolicy/client.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/sdk" ) type ( @@ -47,10 +48,10 @@ type ( } clientImpl struct { - metricsClient metrics.Client - logger log.Logger - temporalClient sdkclient.Client - numWorkflows int + metricsClient metrics.Client + logger log.Logger + sdkClientFactory sdk.ClientFactory + numWorkflows int } ) @@ -67,14 +68,14 @@ const ( func NewClient( metricsClient metrics.Client, logger log.Logger, - publicClient sdkclient.Client, + sdkClientFactory sdk.ClientFactory, numWorkflows int, ) Client { return &clientImpl{ - metricsClient: metricsClient, - logger: logger, - temporalClient: publicClient, - numWorkflows: numWorkflows, + metricsClient: metricsClient, + logger: logger, + sdkClientFactory: sdkClientFactory, + numWorkflows: numWorkflows, } } @@ -88,7 +89,9 @@ func (c *clientImpl) SendParentClosePolicyRequest(request Request) error { } signalCtx, cancel := context.WithTimeout(context.Background(), signalTimeout) defer cancel() - _, err := c.temporalClient.SignalWithStartWorkflow(signalCtx, workflowID, processorChannelName, request, workflowOptions, processorWFTypeName, nil) + + sdkClient := c.sdkClientFactory.GetSystemClient(c.logger) + _, err := sdkClient.SignalWithStartWorkflow(signalCtx, workflowID, processorChannelName, request, workflowOptions, processorWFTypeName, nil) return err } diff --git a/service/worker/parentclosepolicy/processor.go b/service/worker/parentclosepolicy/processor.go index 241ad4ac610..7de234c6669 100644 --- a/service/worker/parentclosepolicy/processor.go +++ b/service/worker/parentclosepolicy/processor.go @@ -28,7 +28,6 @@ import ( "context" "go.temporal.io/sdk/activity" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/workflow" "go.temporal.io/sdk/worker" @@ -38,6 +37,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/sdk" ) type ( @@ -54,7 +54,7 @@ type ( // the sub-system BootstrapParams struct { // SdkSystemClient is an instance of temporal service client - SdkSystemClient sdkclient.Client + SdkClientFactory sdk.ClientFactory // MetricsClient is an instance of metrics object for emitting stats MetricsClient metrics.Client // Logger is the logger @@ -69,30 +69,31 @@ type ( // Processor is the background sub-system that execute workflow for ParentClosePolicy Processor struct { - svcClient sdkclient.Client - clientBean client.Bean - metricsClient metrics.Client - cfg Config - logger log.Logger - currentCluster string + svcClientFactory sdk.ClientFactory + clientBean client.Bean + metricsClient metrics.Client + cfg Config + logger log.Logger + currentCluster string } ) // New returns a new instance as daemon func New(params *BootstrapParams) *Processor { return &Processor{ - svcClient: params.SdkSystemClient, - metricsClient: params.MetricsClient, - cfg: params.Config, - logger: log.With(params.Logger, tag.ComponentBatcher), - clientBean: params.ClientBean, - currentCluster: params.CurrentCluster, + svcClientFactory: params.SdkClientFactory, + metricsClient: params.MetricsClient, + cfg: params.Config, + logger: log.With(params.Logger, tag.ComponentBatcher), + clientBean: params.ClientBean, + currentCluster: params.CurrentCluster, } } // Start starts the scanner func (s *Processor) Start() error { - processorWorker := worker.New(s.svcClient, processorTaskQueueName, getWorkerOptions(s)) + svcClient := s.svcClientFactory.GetSystemClient(s.logger) + processorWorker := worker.New(svcClient, processorTaskQueueName, getWorkerOptions(s)) processorWorker.RegisterWorkflowWithOptions(ProcessorWorkflow, workflow.RegisterOptions{Name: processorWFTypeName}) processorWorker.RegisterActivityWithOptions(ProcessorActivity, activity.RegisterOptions{Name: processorActivityName}) diff --git a/service/worker/service.go b/service/worker/service.go index 04cbe271f11..f52178e2a7e 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -30,7 +30,6 @@ import ( "time" "go.temporal.io/api/serviceerror" - sdkclient "go.temporal.io/sdk/client" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/client" @@ -88,7 +87,6 @@ type ( status int32 stopC chan struct{} sdkClientFactory sdk.ClientFactory - sdkSystemClient sdkclient.Client esClient esclient.Client config *Config @@ -113,7 +111,6 @@ func NewService( logger resource.SnTaggedLogger, serviceConfig *Config, sdkClientFactory sdk.ClientFactory, - sdkSystemClient sdkclient.Client, esClient esclient.Client, archivalMetadata carchiver.ArchivalMetadata, clusterMetadata cluster.Metadata, @@ -140,7 +137,6 @@ func NewService( return &Service{ status: common.DaemonStatusInitialized, config: serviceConfig, - sdkSystemClient: sdkSystemClient, sdkClientFactory: sdkClientFactory, esClient: esClient, stopC: make(chan struct{}), @@ -388,12 +384,12 @@ func (s *Service) Stop() { func (s *Service) startParentClosePolicyProcessor() { params := &parentclosepolicy.BootstrapParams{ - Config: *s.config.ParentCloseCfg, - SdkSystemClient: s.sdkSystemClient, - MetricsClient: s.metricsClient, - Logger: s.logger, - ClientBean: s.clientBean, - CurrentCluster: s.clusterMetadata.GetCurrentClusterName(), + Config: *s.config.ParentCloseCfg, + SdkClientFactory: s.sdkClientFactory, + MetricsClient: s.metricsClient, + Logger: s.logger, + ClientBean: s.clientBean, + CurrentCluster: s.clusterMetadata.GetCurrentClusterName(), } processor := parentclosepolicy.New(params) if err := processor.Start(); err != nil { @@ -407,7 +403,6 @@ func (s *Service) startParentClosePolicyProcessor() { func (s *Service) startBatcher() { if err := batcher.New( s.config.BatcherCfg, - s.sdkSystemClient, s.metricsClient, s.logger, s.sdkClientFactory).Start(); err != nil { @@ -422,7 +417,7 @@ func (s *Service) startScanner() { sc := scanner.New( s.logger, s.config.ScannerCfg, - s.sdkSystemClient, + s.sdkClientFactory.GetSystemClient(s.logger), s.metricsClient, s.executionManager, s.taskManager, @@ -456,13 +451,13 @@ func (s *Service) startReplicator() { func (s *Service) startArchiver() { bc := &archiver.BootstrapContainer{ - SdkSystemClient: s.sdkSystemClient, MetricsClient: s.metricsClient, Logger: s.logger, HistoryV2Manager: s.executionManager, NamespaceCache: s.namespaceRegistry, Config: s.config.ArchiverConfig, ArchiverProvider: s.archiverProvider, + SdkClientFactory: s.sdkClientFactory, } clientWorker := archiver.NewClientWorker(bc) if err := clientWorker.Start(); err != nil { diff --git a/service/worker/worker.go b/service/worker/worker.go index 48cbf4f306c..ab1dd92e4c5 100644 --- a/service/worker/worker.go +++ b/service/worker/worker.go @@ -27,13 +27,13 @@ package worker import ( "sync/atomic" - sdkclient "go.temporal.io/sdk/client" sdkworker "go.temporal.io/sdk/worker" "go.uber.org/fx" "go.temporal.io/server/common" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" + "go.temporal.io/server/common/sdk" workercommon "go.temporal.io/server/service/worker/common" ) @@ -44,7 +44,7 @@ type ( workerManager struct { status int32 logger log.Logger - sdkClient sdkclient.Client + sdkClientFactory sdk.ClientFactory workers []sdkworker.Worker workerComponents []workercommon.WorkerComponent } @@ -52,7 +52,7 @@ type ( initParams struct { fx.In Logger log.Logger - SdkSystemClient sdkclient.Client + SdkClientFactory sdk.ClientFactory WorkerComponents []workercommon.WorkerComponent `group:"workerComponent"` } ) @@ -60,7 +60,7 @@ type ( func NewWorkerManager(params initParams) *workerManager { return &workerManager{ logger: params.Logger, - sdkClient: params.SdkSystemClient, + sdkClientFactory: params.SdkClientFactory, workerComponents: params.WorkerComponents, } } @@ -77,7 +77,8 @@ func (wm *workerManager) Start() { defaultWorkerOptions := sdkworker.Options{ // TODO: add dynamic config for worker options } - defaultWorker := sdkworker.New(wm.sdkClient, DefaultWorkerTaskQueue, defaultWorkerOptions) + sdkClient := wm.sdkClientFactory.GetSystemClient(wm.logger) + defaultWorker := sdkworker.New(sdkClient, DefaultWorkerTaskQueue, defaultWorkerOptions) wm.workers = []sdkworker.Worker{defaultWorker} for _, wc := range wm.workerComponents { @@ -87,7 +88,7 @@ func (wm *workerManager) Start() { wc.Register(defaultWorker) } else { // this worker component requires a dedicated worker - dedicatedWorker := sdkworker.New(wm.sdkClient, workerOptions.TaskQueue, workerOptions.Options) + dedicatedWorker := sdkworker.New(sdkClient, workerOptions.TaskQueue, workerOptions.Options) wc.Register(dedicatedWorker) wm.workers = append(wm.workers, dedicatedWorker) } diff --git a/tools/cli/factory.go b/tools/cli/factory.go index 71d6c9be09e..d2bb3f0068d 100644 --- a/tools/cli/factory.go +++ b/tools/cli/factory.go @@ -123,8 +123,7 @@ func (b *clientFactory) SDKClient(c *cli.Context, namespace string) sdkclient.Cl Logger: log.NewSdkLogger(b.logger), Identity: getCliIdentity(), ConnectionOptions: sdkclient.ConnectionOptions{ - DisableHealthCheck: true, - TLS: tlsConfig, + TLS: tlsConfig, }, HeadersProvider: headersprovider.GetCurrent(), })