From 2e099103706d8fa36ca14a6d3ce0dcb0336e9591 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 2 Apr 2024 19:51:32 -0700 Subject: [PATCH 1/4] add retries and backoffs for propeller sending events to admin Signed-off-by: Paul Dittamo --- flytepropeller/events/admin_eventsink.go | 16 +++++++++++++--- flytepropeller/events/config.go | 20 +++++++++++++------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/flytepropeller/events/admin_eventsink.go b/flytepropeller/events/admin_eventsink.go index df75348b34..f3b7022c68 100644 --- a/flytepropeller/events/admin_eventsink.go +++ b/flytepropeller/events/admin_eventsink.go @@ -3,8 +3,10 @@ package events import ( "context" "fmt" + "time" "github.com/golang/protobuf/proto" + grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/propagation" "golang.org/x/time/rate" @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) { return []byte(id), nil } -func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) { +func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) { cfg := admin2.GetConfig(ctx) tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer) - opt := grpc.WithUnaryInterceptor( + + grpcOptions := []grpcRetry.CallOption{ + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.BackoffJitter)), + grpcRetry.WithMax(config.MaxRetries), + } + + opt := grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor( otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagation.TraceContext{}), ), + grpcRetry.UnaryClientInterceptor(grpcOptions...), ) + clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx) if err != nil { return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err) @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco case EventSinkFile: return NewFileSink(config.FilePath) case EventSinkAdmin: - adminClient, err := initializeAdminClientFromConfig(ctx) + adminClient, err := initializeAdminClientFromConfig(ctx, config) if err != nil { return nil, err } diff --git a/flytepropeller/events/config.go b/flytepropeller/events/config.go index 895ae009b3..bd883df2bc 100644 --- a/flytepropeller/events/config.go +++ b/flytepropeller/events/config.go @@ -21,17 +21,23 @@ const ( ) type Config struct { - Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` - FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` - Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` - Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` + FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` + Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` + Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + MaxRetries uint `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter float64 `json:"backoff-jitter" pflag:",The jitter factor for event recording retries."` } var ( defaultConfig = Config{ - Rate: int64(500), - Capacity: 1000, - Type: EventSinkAdmin, + Rate: int64(500), + Capacity: 1000, + Type: EventSinkAdmin, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: 0.1, } configSection = config.MustRegisterSection(configSectionKey, &defaultConfig) From e4694dbd9c51fa9cc1d9ffd16f72c2594b2de87b Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 3 Apr 2024 18:27:11 -0700 Subject: [PATCH 2/4] update docs Signed-off-by: Paul Dittamo --- .../generated/flytepropeller_config.rst | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/docs/deployment/configuration/generated/flytepropeller_config.rst b/docs/deployment/configuration/generated/flytepropeller_config.rst index b8fbe49f4e..2f049c05ae 100644 --- a/docs/deployment/configuration/generated/flytepropeller_config.rst +++ b/docs/deployment/configuration/generated/flytepropeller_config.rst @@ -671,6 +671,42 @@ The max bucket size for event recording tokens. "1000" +max-retries (uint) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (float64) +------------------------------------------------------------------------------------------------------------------------ + +The jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + Section: logger ======================================================================================================================== From cc3522d04f6a86c0ac8f8783beb54dfa56c4a04e Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Mon, 8 Apr 2024 22:51:43 -0700 Subject: [PATCH 3/4] update retry types Signed-off-by: Paul Dittamo --- flytepropeller/events/admin_eventsink.go | 4 +-- flytepropeller/events/config.go | 27 +++++++++++--- flytepropeller/events/config_flags.go | 3 ++ flytepropeller/events/config_flags_test.go | 42 ++++++++++++++++++++++ 4 files changed, 70 insertions(+), 6 deletions(-) diff --git a/flytepropeller/events/admin_eventsink.go b/flytepropeller/events/admin_eventsink.go index f3b7022c68..cb4b88a69a 100644 --- a/flytepropeller/events/admin_eventsink.go +++ b/flytepropeller/events/admin_eventsink.go @@ -135,8 +135,8 @@ func initializeAdminClientFromConfig(ctx context.Context, config *Config) (clien tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer) grpcOptions := []grpcRetry.CallOption{ - grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.BackoffJitter)), - grpcRetry.WithMax(config.MaxRetries), + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))), + grpcRetry.WithMax(uint(config.MaxRetries)), } opt := grpc.WithChainUnaryInterceptor( diff --git a/flytepropeller/events/config.go b/flytepropeller/events/config.go index bd883df2bc..0e6d5a377c 100644 --- a/flytepropeller/events/config.go +++ b/flytepropeller/events/config.go @@ -2,6 +2,7 @@ package events import ( "context" + "strconv" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -25,9 +26,9 @@ type Config struct { FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` - MaxRetries uint `json:"max-retries" pflag:",The max number of retries for event recording."` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` - BackoffJitter float64 `json:"backoff-jitter" pflag:",The jitter factor for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` } var ( @@ -37,12 +38,30 @@ var ( Type: EventSinkAdmin, MaxRetries: 5, BackoffScalar: 100, - BackoffJitter: 0.1, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(configSectionKey, &defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig Retrieves current global config for storage. func GetConfig(ctx context.Context) *Config { if c, ok := configSection.GetConfig().(*Config); ok { diff --git a/flytepropeller/events/config_flags.go b/flytepropeller/events/config_flags.go index 36e047e0aa..5d18653b45 100755 --- a/flytepropeller/events/config_flags.go +++ b/flytepropeller/events/config_flags.go @@ -54,5 +54,8 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "file-path"), defaultConfig.FilePath, "For file types, specify where the file should be located.") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "rate"), defaultConfig.Rate, "Max rate at which events can be recorded per second.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "capacity"), defaultConfig.Capacity, "The max bucket size for event recording tokens.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") return cmdFlags } diff --git a/flytepropeller/events/config_flags_test.go b/flytepropeller/events/config_flags_test.go index e054350031..d55e39cbfd 100755 --- a/flytepropeller/events/config_flags_test.go +++ b/flytepropeller/events/config_flags_test.go @@ -155,4 +155,46 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_max-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("max-retries", testValue) + if vInt, err := cmdFlags.GetInt("max-retries"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_base-scalar", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_backoff-jitter", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } From 5ba76af86039bb037f66659b3c448f38bae89669 Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Tue, 9 Apr 2024 01:26:33 -0700 Subject: [PATCH 4/4] support expontential backoffs w/ jitter for catalog connection Signed-off-by: Paul Dittamo --- .../generated/flytepropeller_config.rst | 42 ++++++++++++++++-- .../pkg/controller/nodes/catalog/config.go | 44 +++++++++++++++---- .../controller/nodes/catalog/config_flags.go | 3 ++ .../nodes/catalog/config_flags_test.go | 42 ++++++++++++++++++ .../nodes/catalog/datacatalog/datacatalog.go | 7 +-- 5 files changed, 123 insertions(+), 15 deletions(-) diff --git a/docs/deployment/configuration/generated/flytepropeller_config.rst b/docs/deployment/configuration/generated/flytepropeller_config.rst index 2f049c05ae..c0a6e90fb9 100644 --- a/docs/deployment/configuration/generated/flytepropeller_config.rst +++ b/docs/deployment/configuration/generated/flytepropeller_config.rst @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client "false" +max-retries (int) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (string) +------------------------------------------------------------------------------------------------------------------------ + +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + default-service-config (string) ------------------------------------------------------------------------------------------------------------------------ @@ -671,7 +707,7 @@ The max bucket size for event recording tokens. "1000" -max-retries (uint) +max-retries (int) ------------------------------------------------------------------------------------------------------------------------ The max number of retries for event recording. @@ -695,10 +731,10 @@ The base/scalar backoff duration in milliseconds for event recording retries. "100" -backoff-jitter (float64) +backoff-jitter (string) ------------------------------------------------------------------------------------------------------------------------ -The jitter factor for event recording retries. +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. **Default Value**: diff --git a/flytepropeller/pkg/controller/nodes/catalog/config.go b/flytepropeller/pkg/controller/nodes/catalog/config.go index 2b0c484fb7..4dd7bc70ae 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/config.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config.go @@ -3,12 +3,14 @@ package catalog import ( "context" "fmt" + "strconv" "google.golang.org/grpc" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog" "github.com/flyteorg/flyte/flytestdlib/config" + "github.com/flyteorg/flyte/flytestdlib/logger" ) //go:generate pflags Config --default-var defaultConfig @@ -16,11 +18,22 @@ import ( const ConfigSectionKey = "catalog-cache" var ( - defaultConfig = &Config{ - Type: NoOpDiscoveryType, + defaultConfig = Config{ + Type: NoOpDiscoveryType, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) type DiscoveryType = string @@ -31,11 +44,14 @@ const ( ) type Config struct { - Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` - Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` - Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` - MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` - UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` + Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` + Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` + MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` + UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` // Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md // eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]} @@ -44,6 +60,16 @@ type Config struct { DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"` } +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig gets loaded config for Discovery func GetConfig() *Config { return configSection.GetConfig().(*Config) @@ -56,7 +82,7 @@ func NewCatalogClient(ctx context.Context, authOpt ...grpc.DialOption) (catalog. case DataCatalogType: return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, - authOpt...) + uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...) case NoOpDiscoveryType, "": return NOOPCatalog{}, nil } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go index 4bd40b20b4..1f9a748b2d 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go @@ -55,6 +55,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "insecure"), defaultConfig.Insecure, " Use insecure grpc connection") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "max-cache-age"), defaultConfig.MaxCacheAge.String(), " Cache entries past this age will incur cache miss. 0 means cache never expires") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "use-admin-auth"), defaultConfig.UseAdminAuth, " Use the same gRPC credentials option as the flyteadmin client") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-service-config"), defaultConfig.DefaultServiceConfig, " Set the default service config for the catalog gRPC client") return cmdFlags } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go index 3b18a9282d..2e7537e981 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go @@ -169,6 +169,48 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_max-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("max-retries", testValue) + if vInt, err := cmdFlags.GetInt("max-retries"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_base-scalar", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_backoff-jitter", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_default-service-config", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go index 6a029255a8..a04623f88a 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go +++ b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go @@ -452,16 +452,17 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, } // NewDataCatalog creates a new Datacatalog client for task execution caching -func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error) { +func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, + useAdminAuth bool, defaultServiceConfig string, maxRetries uint, backoffScalar int, backoffJitter float64, authOpt ...grpc.DialOption) (*CatalogClient, error) { var opts []grpc.DialOption if useAdminAuth && authOpt != nil { opts = append(opts, authOpt...) } grpcOptions := []grpcRetry.CallOption{ - grpcRetry.WithBackoff(grpcRetry.BackoffLinear(100 * time.Millisecond)), + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(backoffScalar)*time.Millisecond, backoffJitter)), grpcRetry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled), - grpcRetry.WithMax(5), + grpcRetry.WithMax(maxRetries), } if insecureConnection {