Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing SdkClient injection with SdkClientFactory injection. #2611

Merged
merged 4 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ integration-test: clean-test-results
@! grep -q "^--- FAIL" test.log

integration-with-fault-injection-test: clean-test-results
@printf $(COLOR) "Run integration tests..."
@printf $(COLOR) "Run integration tests with fault injection..."
$(foreach INTEG_TEST_DIR,$(INTEG_TEST_DIRS),\
@go test $(INTEG_TEST_DIR) -timeout=$(TEST_TIMEOUT) $(TEST_TAG) -race -PersistenceFaultInjectionRate=0.005 | tee -a test.log \
$(NEWLINE))
Expand Down
1 change: 0 additions & 1 deletion common/resource/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ var Module = fx.Options(
membership.MonitorLifetimeHooksModule,
fx.Provide(ClientFactoryProvider),
fx.Provide(ClientBeanProvider),
sdk.Module,
fx.Provide(SdkClientFactoryProvider),
fx.Provide(FrontedClientProvider),
fx.Provide(GrpcListenerProvider),
Expand Down
65 changes: 45 additions & 20 deletions common/sdk/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,28 @@ package sdk
import (
"crypto/tls"
"fmt"
"sync"

sdkclient "go.temporal.io/sdk/client"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)

type (
ClientFactory interface {
NewClient(namespaceName string, logger log.Logger) (sdkclient.Client, error)
NewSystemClient(logger log.Logger) (sdkclient.Client, error)
GetSystemClient(logger log.Logger) sdkclient.Client
}

clientFactory struct {
hostPort string
tlsConfig *tls.Config
metricsHandler *MetricsHandler
hostPort string
tlsConfig *tls.Config
metricsHandler *MetricsHandler
systemSdkClient sdkclient.Client
once *sync.Once
}
)

Expand All @@ -56,27 +61,47 @@ func NewClientFactory(hostPort string, tlsConfig *tls.Config, metricsHandler *Me
hostPort: hostPort,
tlsConfig: tlsConfig,
metricsHandler: metricsHandler,
once: &sync.Once{},
}
}

func (f *clientFactory) NewClient(namespaceName string, logger log.Logger) (sdkclient.Client, error) {
sdkClient, err := sdkclient.NewClient(sdkclient.Options{
HostPort: f.hostPort,
Namespace: namespaceName,
MetricsHandler: f.metricsHandler,
Logger: log.NewSdkLogger(logger),
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: f.tlsConfig,
DisableHealthCheck: true,
},
})
if err != nil {
return nil, fmt.Errorf("unable to create SDK client: %w", err)
}
var client sdkclient.Client

// Retry for up to 1m, handles frontend service not ready
err := backoff.Retry(func() error {
sdkClient, err := sdkclient.NewClient(sdkclient.Options{
yiminc marked this conversation as resolved.
Show resolved Hide resolved
HostPort: f.hostPort,
Namespace: namespaceName,
MetricsHandler: f.metricsHandler,
Logger: log.NewSdkLogger(logger),
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: f.tlsConfig,
},
})
if err != nil {
return fmt.Errorf("unable to create SDK client: %w", err)
}

client = sdkClient
return nil
}, common.CreateSdkClientFactoryRetryPolicy(), common.IsContextDeadlineExceededErr)

return sdkClient, nil
return client, err
}

func (f *clientFactory) NewSystemClient(logger log.Logger) (sdkclient.Client, error) {
return f.NewClient(common.SystemLocalNamespace, logger)
func (f *clientFactory) GetSystemClient(logger log.Logger) sdkclient.Client {
f.once.Do(func() {
client, err := f.NewClient(common.SystemLocalNamespace, logger)

if err != nil {
logger.Fatal(
"error getting system sdk client",
tag.Error(err),
)
}

f.systemSdkClient = client
})
return f.systemSdkClient
}
27 changes: 13 additions & 14 deletions common/sdk/factory_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 0 additions & 40 deletions common/sdk/fx.go

This file was deleted.

11 changes: 11 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ const (
replicationServiceBusyMaxInterval = 10 * time.Second
replicationServiceBusyExpirationInterval = 30 * time.Second

sdkClientFactoryRetryExpirationInterval = time.Minute

defaultInitialInterval = time.Second
defaultMaximumIntervalCoefficient = 100.0
defaultBackoffCoefficient = 2.0
Expand Down Expand Up @@ -202,6 +204,15 @@ func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy {
return policy
}

// CreateSdkClientFactoryRetryPolicy creates a retry policy to handle SdkClientFactory NewClient when frontend service is not ready
func CreateSdkClientFactoryRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval)
policy.SetMaximumInterval(frontendServiceOperationMaxInterval)
policy.SetExpirationInterval(sdkClientFactoryRetryExpirationInterval)

return policy
}

// IsPersistenceTransientError checks if the error is a transient persistence error
func IsPersistenceTransientError(err error) bool {
switch err.(type) {
Expand Down
6 changes: 0 additions & 6 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func (s *clientIntegrationSuite) SetupTest() {
sdkClient, err := sdkclient.NewClient(sdkclient.Options{
HostPort: s.hostPort,
Namespace: s.namespace,
ConnectionOptions: sdkclient.ConnectionOptions{
DisableHealthCheck: true,
},
})
if err != nil {
s.Logger.Fatal("Error when creating SDK client", tag.Error(err))
Expand Down Expand Up @@ -249,9 +246,6 @@ func (s *clientIntegrationSuite) startWorkerWithDataConverter(tl string, dataCon
HostPort: s.hostPort,
Namespace: s.namespace,
DataConverter: dataConverter,
ConnectionOptions: sdkclient.ConnectionOptions{
DisableHealthCheck: true,
},
})
if err != nil {
s.Logger.Fatal("Error when creating SDK client", tag.Error(err))
Expand Down
14 changes: 9 additions & 5 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/rpc/interceptor"
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/xdc"
"go.temporal.io/server/service/history/tasks"
Expand Down Expand Up @@ -106,7 +107,7 @@ type (
clientFactory serverClient.Factory
clientBean serverClient.Bean
historyClient historyservice.HistoryServiceClient
sdkClient sdkclient.Client
sdkClientFactory sdk.ClientFactory
membershipMonitor membership.Monitor
metricsClient metrics.Client
namespaceRegistry namespace.Registry
Expand All @@ -132,7 +133,7 @@ type (
ClientFactory serverClient.Factory
ClientBean serverClient.Bean
HistoryClient historyservice.HistoryServiceClient
SdkSystemClient sdkclient.Client
sdkClientFactory sdk.ClientFactory
MembershipMonitor membership.Monitor
ArchiverProvider provider.ArchiverProvider
MetricsClient metrics.Client
Expand Down Expand Up @@ -193,7 +194,7 @@ func NewAdminHandler(
clientFactory: args.ClientFactory,
clientBean: args.ClientBean,
historyClient: args.HistoryClient,
sdkClient: args.SdkSystemClient,
sdkClientFactory: args.sdkClientFactory,
membershipMonitor: args.MembershipMonitor,
metricsClient: args.MetricsClient,
namespaceRegistry: args.NamespaceRegistry,
Expand Down Expand Up @@ -278,7 +279,8 @@ func (adh *AdminHandler) AddSearchAttributes(ctx context.Context, request *admin
SkipSchemaUpdate: request.GetSkipSchemaUpdate(),
}

run, err := adh.sdkClient.ExecuteWorkflow(
sdkClient := adh.sdkClientFactory.GetSystemClient(adh.logger)
run, err := sdkClient.ExecuteWorkflow(
ctx,
sdkclient.StartWorkflowOptions{
TaskQueue: worker.DefaultWorkerTaskQueue,
Expand Down Expand Up @@ -376,7 +378,9 @@ func (adh *AdminHandler) GetSearchAttributes(ctx context.Context, request *admin

func (adh *AdminHandler) getSearchAttributes(ctx context.Context, indexName string, runID string) (*adminservice.GetSearchAttributesResponse, error) {
var lastErr error
descResp, err := adh.sdkClient.DescribeWorkflowExecution(ctx, addsearchattributes.WorkflowName, runID)

sdkClient := adh.sdkClientFactory.GetSystemClient(adh.logger)
descResp, err := sdkClient.DescribeWorkflowExecution(ctx, addsearchattributes.WorkflowName, runID)
var wfInfo *workflowpb.WorkflowExecutionInfo
if err != nil {
// NotFound can happen when no search attributes were added and the workflow has never been executed.
Expand Down
Loading