diff --git a/docs/deployment/configuration/generated/flytepropeller_config.rst b/docs/deployment/configuration/generated/flytepropeller_config.rst index 6ddf08273c..45569a6016 100644 --- a/docs/deployment/configuration/generated/flytepropeller_config.rst +++ b/docs/deployment/configuration/generated/flytepropeller_config.rst @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client "false" +max-retries (int) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (string) +------------------------------------------------------------------------------------------------------------------------ + +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + default-service-config (string) ------------------------------------------------------------------------------------------------------------------------ @@ -671,6 +707,42 @@ The max bucket size for event recording tokens. "1000" +max-retries (int) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (string) +------------------------------------------------------------------------------------------------------------------------ + +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + Section: logger ======================================================================================================================== diff --git a/flytepropeller/events/admin_eventsink.go b/flytepropeller/events/admin_eventsink.go index df75348b34..cb4b88a69a 100644 --- a/flytepropeller/events/admin_eventsink.go +++ b/flytepropeller/events/admin_eventsink.go @@ -3,8 +3,10 @@ package events import ( "context" "fmt" + "time" "github.com/golang/protobuf/proto" + grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/propagation" "golang.org/x/time/rate" @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) { return []byte(id), nil } -func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) { +func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) { cfg := admin2.GetConfig(ctx) tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer) - opt := grpc.WithUnaryInterceptor( + + grpcOptions := []grpcRetry.CallOption{ + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))), + grpcRetry.WithMax(uint(config.MaxRetries)), + } + + opt := grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor( otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagation.TraceContext{}), ), + grpcRetry.UnaryClientInterceptor(grpcOptions...), ) + clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx) if err != nil { return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err) @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco case EventSinkFile: return NewFileSink(config.FilePath) case EventSinkAdmin: - adminClient, err := initializeAdminClientFromConfig(ctx) + adminClient, err := initializeAdminClientFromConfig(ctx, config) if err != nil { return nil, err } diff --git a/flytepropeller/events/config.go b/flytepropeller/events/config.go index 895ae009b3..0e6d5a377c 100644 --- a/flytepropeller/events/config.go +++ b/flytepropeller/events/config.go @@ -2,6 +2,7 @@ package events import ( "context" + "strconv" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -21,22 +22,46 @@ const ( ) type Config struct { - Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` - FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` - Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` - Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` + FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` + Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` + Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` } var ( defaultConfig = Config{ - Rate: int64(500), - Capacity: 1000, - Type: EventSinkAdmin, + Rate: int64(500), + Capacity: 1000, + Type: EventSinkAdmin, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(configSectionKey, &defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig Retrieves current global config for storage. func GetConfig(ctx context.Context) *Config { if c, ok := configSection.GetConfig().(*Config); ok { diff --git a/flytepropeller/events/config_flags.go b/flytepropeller/events/config_flags.go index 36e047e0aa..5d18653b45 100755 --- a/flytepropeller/events/config_flags.go +++ b/flytepropeller/events/config_flags.go @@ -54,5 +54,8 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "file-path"), defaultConfig.FilePath, "For file types, specify where the file should be located.") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "rate"), defaultConfig.Rate, "Max rate at which events can be recorded per second.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "capacity"), defaultConfig.Capacity, "The max bucket size for event recording tokens.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") return cmdFlags } diff --git a/flytepropeller/events/config_flags_test.go b/flytepropeller/events/config_flags_test.go index e054350031..d55e39cbfd 100755 --- a/flytepropeller/events/config_flags_test.go +++ b/flytepropeller/events/config_flags_test.go @@ -155,4 +155,46 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_max-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("max-retries", testValue) + if vInt, err := cmdFlags.GetInt("max-retries"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_base-scalar", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_backoff-jitter", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/catalog/cacheservice/cache_client.go b/flytepropeller/pkg/controller/nodes/catalog/cacheservice/cache_client.go index 54e367382f..19f5eca56f 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/cacheservice/cache_client.go +++ b/flytepropeller/pkg/controller/nodes/catalog/cacheservice/cache_client.go @@ -215,17 +215,16 @@ func (c *CacheClient) Put(ctx context.Context, key catalog.Key, reader io.Output } func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, endpoint string, insecureConnection bool, maxCacheAge time.Duration, - useAdminAuth bool, maxRetries int, maxPerRetryTimeout time.Duration, backOffScalar int, inlineCache bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CacheClient, error) { + useAdminAuth bool, maxRetries uint, backoffScalar int, backoffJitter float64, inlineCache bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CacheClient, error) { var opts []grpc.DialOption if useAdminAuth && authOpt != nil { opts = append(opts, authOpt...) } - grpcOptions := []grpcRetry.CallOption{ - grpcRetry.WithBackoff(grpcRetry.BackoffLinear(time.Duration(backOffScalar) * time.Millisecond)), + grpcOptions := []grpcRetry.CallOption{ + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(backoffScalar)*time.Millisecond, backoffJitter)), grpcRetry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled), - grpcRetry.WithMax(uint(maxRetries)), - grpcRetry.WithPerRetryTimeout(maxPerRetryTimeout), + grpcRetry.WithMax(maxRetries), } if insecureConnection { diff --git a/flytepropeller/pkg/controller/nodes/catalog/config.go b/flytepropeller/pkg/controller/nodes/catalog/config.go index 32cd0bf11f..3108df17a9 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/config.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config.go @@ -3,6 +3,7 @@ package catalog import ( "context" "fmt" + "strconv" "google.golang.org/grpc" @@ -11,6 +12,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/storage" + "github.com/flyteorg/flyte/flytestdlib/logger" ) //go:generate pflags Config --default-var defaultConfig @@ -18,14 +20,22 @@ import ( const ConfigSectionKey = "catalog-cache" var ( - defaultConfig = &Config{ - Type: NoOpDiscoveryType, - MaxRetries: 5, - MaxPerRetryTimeout: config.Duration{Duration: 0}, - BackOffScalar: 100, + defaultConfig = Config{ + Type: NoOpDiscoveryType, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) type DiscoveryType = string @@ -38,16 +48,16 @@ const ( ) type Config struct { - Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` - Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` - CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"` - Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` - MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` - UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` - MaxRetries int `json:"max-retries" pflag:",Max number of gRPC retries"` - MaxPerRetryTimeout config.Duration `json:"max-per-retry-timeout" pflag:",gRPC per retry timeout. O means no timeout."` - BackOffScalar int `json:"backoff-scalar" pflag:",gRPC backoff scalar in milliseconds"` - InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"` + Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` + Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` + CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"` + Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` + MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` + UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` + InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"` // Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md // eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]} @@ -56,6 +66,16 @@ type Config struct { DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"` } +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig gets loaded config for Discovery func GetConfig() *Config { return configSection.GetConfig().(*Config) @@ -67,20 +87,20 @@ func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, authOpt . switch catalogConfig.Type { case CacheServiceType: return cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure, - catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.MaxRetries, - catalogConfig.MaxPerRetryTimeout.Duration, catalogConfig.BackOffScalar, catalogConfig.InlineCache, + catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries), + catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache, catalogConfig.DefaultServiceConfig, authOpt...) case FallbackType: cacheClient, err := cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure, - catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.MaxRetries, - catalogConfig.MaxPerRetryTimeout.Duration, catalogConfig.BackOffScalar, catalogConfig.InlineCache, + catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries), + catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache, catalogConfig.DefaultServiceConfig, authOpt...) if err != nil { return nil, err } catalogClient, err := datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, - authOpt...) + uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...) if err != nil { return nil, err } @@ -88,7 +108,7 @@ func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, authOpt . case DataCatalogType: return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, - authOpt...) + uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...) case NoOpDiscoveryType, "": return NOOPCatalog{}, nil } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go index b640f3e558..b3372b1ebe 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go @@ -56,9 +56,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "insecure"), defaultConfig.Insecure, " Use insecure grpc connection") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "max-cache-age"), defaultConfig.MaxCacheAge.String(), " Cache entries past this age will incur cache miss. 0 means cache never expires") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "use-admin-auth"), defaultConfig.UseAdminAuth, " Use the same gRPC credentials option as the flyteadmin client") - cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "Max number of gRPC retries") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "max-per-retry-timeout"), defaultConfig.MaxPerRetryTimeout.String(), "gRPC per retry timeout. O means no timeout.") - cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "backoff-scalar"), defaultConfig.BackOffScalar, "gRPC backoff scalar in milliseconds") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "inline-cache"), defaultConfig.InlineCache, " Attempt to use in-line cache") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-service-config"), defaultConfig.DefaultServiceConfig, " Set the default service config for the catalog gRPC client") return cmdFlags diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go index c491d2a64c..ebb7c45f70 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go @@ -197,28 +197,28 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_max-per-retry-timeout", func(t *testing.T) { + t.Run("Test_base-scalar", func(t *testing.T) { t.Run("Override", func(t *testing.T) { - testValue := defaultConfig.MaxPerRetryTimeout.String() + testValue := "1" - cmdFlags.Set("max-per-retry-timeout", testValue) - if vString, err := cmdFlags.GetString("max-per-retry-timeout"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.MaxPerRetryTimeout) + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) } else { assert.FailNow(t, err.Error()) } }) }) - t.Run("Test_backoff-scalar", func(t *testing.T) { + t.Run("Test_backoff-jitter", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("backoff-scalar", testValue) - if vInt, err := cmdFlags.GetInt("backoff-scalar"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackOffScalar) + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) } else { assert.FailNow(t, err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go index ab635f98dc..be3c467a55 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go +++ b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go @@ -444,16 +444,17 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, } // NewDataCatalog creates a new Datacatalog client for task execution caching -func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error) { +func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, + useAdminAuth bool, defaultServiceConfig string, maxRetries uint, backoffScalar int, backoffJitter float64, authOpt ...grpc.DialOption) (*CatalogClient, error) { var opts []grpc.DialOption if useAdminAuth && authOpt != nil { opts = append(opts, authOpt...) } grpcOptions := []grpcRetry.CallOption{ - grpcRetry.WithBackoff(grpcRetry.BackoffLinear(100 * time.Millisecond)), + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(backoffScalar)*time.Millisecond, backoffJitter)), grpcRetry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled), - grpcRetry.WithMax(5), + grpcRetry.WithMax(maxRetries), } if insecureConnection {