diff --git a/docs/deployment/configuration/generated/flytepropeller_config.rst b/docs/deployment/configuration/generated/flytepropeller_config.rst index b8fbe49f4e..c0a6e90fb9 100644 --- a/docs/deployment/configuration/generated/flytepropeller_config.rst +++ b/docs/deployment/configuration/generated/flytepropeller_config.rst @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client "false" +max-retries (int) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (string) +------------------------------------------------------------------------------------------------------------------------ + +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + default-service-config (string) ------------------------------------------------------------------------------------------------------------------------ @@ -671,6 +707,42 @@ The max bucket size for event recording tokens. "1000" +max-retries (int) +------------------------------------------------------------------------------------------------------------------------ + +The max number of retries for event recording. + +**Default Value**: + +.. code-block:: yaml + + "5" + + +base-scalar (int) +------------------------------------------------------------------------------------------------------------------------ + +The base/scalar backoff duration in milliseconds for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "100" + + +backoff-jitter (string) +------------------------------------------------------------------------------------------------------------------------ + +A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries. + +**Default Value**: + +.. code-block:: yaml + + "0.1" + + Section: logger ======================================================================================================================== diff --git a/flytepropeller/events/admin_eventsink.go b/flytepropeller/events/admin_eventsink.go index df75348b34..cb4b88a69a 100644 --- a/flytepropeller/events/admin_eventsink.go +++ b/flytepropeller/events/admin_eventsink.go @@ -3,8 +3,10 @@ package events import ( "context" "fmt" + "time" "github.com/golang/protobuf/proto" + grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel/propagation" "golang.org/x/time/rate" @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) { return []byte(id), nil } -func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) { +func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) { cfg := admin2.GetConfig(ctx) tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer) - opt := grpc.WithUnaryInterceptor( + + grpcOptions := []grpcRetry.CallOption{ + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))), + grpcRetry.WithMax(uint(config.MaxRetries)), + } + + opt := grpc.WithChainUnaryInterceptor( otelgrpc.UnaryClientInterceptor( otelgrpc.WithTracerProvider(tracerProvider), otelgrpc.WithPropagators(propagation.TraceContext{}), ), + grpcRetry.UnaryClientInterceptor(grpcOptions...), ) + clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx) if err != nil { return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err) @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco case EventSinkFile: return NewFileSink(config.FilePath) case EventSinkAdmin: - adminClient, err := initializeAdminClientFromConfig(ctx) + adminClient, err := initializeAdminClientFromConfig(ctx, config) if err != nil { return nil, err } diff --git a/flytepropeller/events/config.go b/flytepropeller/events/config.go index 895ae009b3..0e6d5a377c 100644 --- a/flytepropeller/events/config.go +++ b/flytepropeller/events/config.go @@ -2,6 +2,7 @@ package events import ( "context" + "strconv" "github.com/flyteorg/flyte/flytestdlib/config" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -21,22 +22,46 @@ const ( ) type Config struct { - Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` - FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` - Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` - Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."` + FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."` + Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."` + Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` } var ( defaultConfig = Config{ - Rate: int64(500), - Capacity: 1000, - Type: EventSinkAdmin, + Rate: int64(500), + Capacity: 1000, + Type: EventSinkAdmin, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(configSectionKey, &defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig Retrieves current global config for storage. func GetConfig(ctx context.Context) *Config { if c, ok := configSection.GetConfig().(*Config); ok { diff --git a/flytepropeller/events/config_flags.go b/flytepropeller/events/config_flags.go index 36e047e0aa..5d18653b45 100755 --- a/flytepropeller/events/config_flags.go +++ b/flytepropeller/events/config_flags.go @@ -54,5 +54,8 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "file-path"), defaultConfig.FilePath, "For file types, specify where the file should be located.") cmdFlags.Int64(fmt.Sprintf("%v%v", prefix, "rate"), defaultConfig.Rate, "Max rate at which events can be recorded per second.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "capacity"), defaultConfig.Capacity, "The max bucket size for event recording tokens.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") return cmdFlags } diff --git a/flytepropeller/events/config_flags_test.go b/flytepropeller/events/config_flags_test.go index e054350031..d55e39cbfd 100755 --- a/flytepropeller/events/config_flags_test.go +++ b/flytepropeller/events/config_flags_test.go @@ -155,4 +155,46 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_max-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("max-retries", testValue) + if vInt, err := cmdFlags.GetInt("max-retries"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_base-scalar", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_backoff-jitter", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config.go b/flytepropeller/pkg/controller/nodes/catalog/config.go index 2b0c484fb7..4dd7bc70ae 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/config.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config.go @@ -3,12 +3,14 @@ package catalog import ( "context" "fmt" + "strconv" "google.golang.org/grpc" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/catalog" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog" "github.com/flyteorg/flyte/flytestdlib/config" + "github.com/flyteorg/flyte/flytestdlib/logger" ) //go:generate pflags Config --default-var defaultConfig @@ -16,11 +18,22 @@ import ( const ConfigSectionKey = "catalog-cache" var ( - defaultConfig = &Config{ - Type: NoOpDiscoveryType, + defaultConfig = Config{ + Type: NoOpDiscoveryType, + MaxRetries: 5, + BackoffScalar: 100, + BackoffJitter: "0.1", } - configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig) + configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) { + if newValue.(*Config).MaxRetries < 0 { + logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.") + } + + if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 { + logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter) + } + }) ) type DiscoveryType = string @@ -31,11 +44,14 @@ const ( ) type Config struct { - Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` - Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` - Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` - MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` - UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"` + Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"` + Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"` + MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"` + UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"` + MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."` + BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."` + BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."` // Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md // eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]} @@ -44,6 +60,16 @@ type Config struct { DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"` } +func (c Config) GetBackoffJitter(ctx context.Context) float64 { + jitter, err := strconv.ParseFloat(c.BackoffJitter, 64) + if err != nil { + logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err) + return 0.1 + } + + return jitter +} + // GetConfig gets loaded config for Discovery func GetConfig() *Config { return configSection.GetConfig().(*Config) @@ -56,7 +82,7 @@ func NewCatalogClient(ctx context.Context, authOpt ...grpc.DialOption) (catalog. case DataCatalogType: return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig, - authOpt...) + uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...) case NoOpDiscoveryType, "": return NOOPCatalog{}, nil } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go index 4bd40b20b4..1f9a748b2d 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags.go @@ -55,6 +55,9 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "insecure"), defaultConfig.Insecure, " Use insecure grpc connection") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "max-cache-age"), defaultConfig.MaxCacheAge.String(), " Cache entries past this age will incur cache miss. 0 means cache never expires") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "use-admin-auth"), defaultConfig.UseAdminAuth, " Use the same gRPC credentials option as the flyteadmin client") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-retries"), defaultConfig.MaxRetries, "The max number of retries for event recording.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "base-scalar"), defaultConfig.BackoffScalar, "The base/scalar backoff duration in milliseconds for event recording retries.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "backoff-jitter"), defaultConfig.BackoffJitter, "A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-service-config"), defaultConfig.DefaultServiceConfig, " Set the default service config for the catalog gRPC client") return cmdFlags } diff --git a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go index 3b18a9282d..2e7537e981 100755 --- a/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go +++ b/flytepropeller/pkg/controller/nodes/catalog/config_flags_test.go @@ -169,6 +169,48 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_max-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("max-retries", testValue) + if vInt, err := cmdFlags.GetInt("max-retries"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.MaxRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_base-scalar", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("base-scalar", testValue) + if vInt, err := cmdFlags.GetInt("base-scalar"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.BackoffScalar) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_backoff-jitter", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("backoff-jitter", testValue) + if vString, err := cmdFlags.GetString("backoff-jitter"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.BackoffJitter) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) t.Run("Test_default-service-config", func(t *testing.T) { t.Run("Override", func(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go index 6a029255a8..a04623f88a 100644 --- a/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go +++ b/flytepropeller/pkg/controller/nodes/catalog/datacatalog/datacatalog.go @@ -452,16 +452,17 @@ func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, } // NewDataCatalog creates a new Datacatalog client for task execution caching -func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CatalogClient, error) { +func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, + useAdminAuth bool, defaultServiceConfig string, maxRetries uint, backoffScalar int, backoffJitter float64, authOpt ...grpc.DialOption) (*CatalogClient, error) { var opts []grpc.DialOption if useAdminAuth && authOpt != nil { opts = append(opts, authOpt...) } grpcOptions := []grpcRetry.CallOption{ - grpcRetry.WithBackoff(grpcRetry.BackoffLinear(100 * time.Millisecond)), + grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(backoffScalar)*time.Millisecond, backoffJitter)), grpcRetry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled), - grpcRetry.WithMax(5), + grpcRetry.WithMax(maxRetries), } if insecureConnection {