Skip to content

Commit

Permalink
cherry pick admin event backoff (#230)
Browse files Browse the repository at this point in the history
* [Feature] add retries and backoffs for propeller sending events to admin (#5166)

* add retries and backoffs for propeller sending events to admin

Signed-off-by: Paul Dittamo <[email protected]>

* update docs

Signed-off-by: Paul Dittamo <[email protected]>

* update retry types

Signed-off-by: Paul Dittamo <[email protected]>

* support expontential backoffs w/ jitter for catalog connection

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* propeller config generate

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Paul Dittamo <[email protected]>
  • Loading branch information
hamersaw and pvditt authored Apr 24, 2024
1 parent 91dc63f commit 83cd81b
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 53 deletions.
72 changes: 72 additions & 0 deletions docs/deployment/configuration/generated/flytepropeller_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,42 @@ Use the same gRPC credentials option as the flyteadmin client
"false"
max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml
"5"
base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml
"100"
backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml
"0.1"
default-service-config (string)
------------------------------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -671,6 +707,42 @@ The max bucket size for event recording tokens.
"1000"
max-retries (int)
------------------------------------------------------------------------------------------------------------------------

The max number of retries for event recording.

**Default Value**:

.. code-block:: yaml
"5"
base-scalar (int)
------------------------------------------------------------------------------------------------------------------------

The base/scalar backoff duration in milliseconds for event recording retries.

**Default Value**:

.. code-block:: yaml
"100"
backoff-jitter (string)
------------------------------------------------------------------------------------------------------------------------

A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries.

**Default Value**:

.. code-block:: yaml
"0.1"
Section: logger
========================================================================================================================

Expand Down
16 changes: 13 additions & 3 deletions flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package events
import (
"context"
"fmt"
"time"

"github.com/golang/protobuf/proto"
grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -128,15 +130,23 @@ func IDFromMessage(message proto.Message) ([]byte, error) {
return []byte(id), nil
}

func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) {
func initializeAdminClientFromConfig(ctx context.Context, config *Config) (client service.AdminServiceClient, err error) {
cfg := admin2.GetConfig(ctx)
tracerProvider := otelutils.GetTracerProvider(otelutils.AdminClientTracer)
opt := grpc.WithUnaryInterceptor(

grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(config.BackoffScalar)*time.Millisecond, config.GetBackoffJitter(ctx))),
grpcRetry.WithMax(uint(config.MaxRetries)),
}

opt := grpc.WithChainUnaryInterceptor(
otelgrpc.UnaryClientInterceptor(
otelgrpc.WithTracerProvider(tracerProvider),
otelgrpc.WithPropagators(propagation.TraceContext{}),
),
grpcRetry.UnaryClientInterceptor(grpcOptions...),
)

clients, err := admin2.NewClientsetBuilder().WithDialOptions(opt).WithConfig(cfg).Build(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
Expand All @@ -152,7 +162,7 @@ func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Sco
case EventSinkFile:
return NewFileSink(config.FilePath)
case EventSinkAdmin:
adminClient, err := initializeAdminClientFromConfig(ctx)
adminClient, err := initializeAdminClientFromConfig(ctx, config)
if err != nil {
return nil, err
}
Expand Down
41 changes: 33 additions & 8 deletions flytepropeller/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package events

import (
"context"
"strconv"

"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/logger"
Expand All @@ -21,22 +22,46 @@ const (
)

type Config struct {
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
Type EventReportingType `json:"type" pflag:",Sets the type of EventSink to configure [log/admin/file]."`
FilePath string `json:"file-path" pflag:",For file types, specify where the file should be located."`
Rate int64 `json:"rate" pflag:",Max rate at which events can be recorded per second."`
Capacity int `json:"capacity" pflag:",The max bucket size for event recording tokens."`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
}

var (
defaultConfig = Config{
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
Rate: int64(500),
Capacity: 1000,
Type: EventSinkAdmin,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(configSectionKey, &defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(configSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig Retrieves current global config for storage.
func GetConfig(ctx context.Context) *Config {
if c, ok := configSection.GetConfig().(*Config); ok {
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/events/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions flytepropeller/events/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -215,17 +215,16 @@ func (c *CacheClient) Put(ctx context.Context, key catalog.Key, reader io.Output
}

func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, endpoint string, insecureConnection bool, maxCacheAge time.Duration,
useAdminAuth bool, maxRetries int, maxPerRetryTimeout time.Duration, backOffScalar int, inlineCache bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CacheClient, error) {
useAdminAuth bool, maxRetries uint, backoffScalar int, backoffJitter float64, inlineCache bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CacheClient, error) {
var opts []grpc.DialOption
if useAdminAuth && authOpt != nil {
opts = append(opts, authOpt...)
}

grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffLinear(time.Duration(backOffScalar) * time.Millisecond)),
grpcOptions := []grpcRetry.CallOption{
grpcRetry.WithBackoff(grpcRetry.BackoffExponentialWithJitter(time.Duration(backoffScalar)*time.Millisecond, backoffJitter)),
grpcRetry.WithCodes(codes.DeadlineExceeded, codes.Unavailable, codes.Canceled),
grpcRetry.WithMax(uint(maxRetries)),
grpcRetry.WithPerRetryTimeout(maxPerRetryTimeout),
grpcRetry.WithMax(maxRetries),
}

if insecureConnection {
Expand Down
64 changes: 42 additions & 22 deletions flytepropeller/pkg/controller/nodes/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package catalog
import (
"context"
"fmt"
"strconv"

"google.golang.org/grpc"

Expand All @@ -11,21 +12,30 @@ import (
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/catalog/datacatalog"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config --default-var defaultConfig

const ConfigSectionKey = "catalog-cache"

var (
defaultConfig = &Config{
Type: NoOpDiscoveryType,
MaxRetries: 5,
MaxPerRetryTimeout: config.Duration{Duration: 0},
BackOffScalar: 100,
defaultConfig = Config{
Type: NoOpDiscoveryType,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
}

configSection = config.MustRegisterSection(ConfigSectionKey, defaultConfig)
configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
if newValue.(*Config).MaxRetries < 0 {
logger.Panicf(ctx, "Admin configuration given with negative gRPC retry value.")
}

if jitter, err := strconv.ParseFloat(newValue.(*Config).BackoffJitter, 64); err != nil || jitter < 0 || jitter > 1 {
logger.Panicf(ctx, "Invalid jitter value [%v]. Must be between 0 and 1.", jitter)
}
})
)

type DiscoveryType = string
Expand All @@ -38,16 +48,16 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",Max number of gRPC retries"`
MaxPerRetryTimeout config.Duration `json:"max-per-retry-timeout" pflag:",gRPC per retry timeout. O means no timeout."`
BackOffScalar int `json:"backoff-scalar" pflag:",gRPC backoff scalar in milliseconds"`
InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"`

// Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md
// eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]}
Expand All @@ -56,6 +66,16 @@ type Config struct {
DefaultServiceConfig string `json:"default-service-config" pflag:"\"\", Set the default service config for the catalog gRPC client"`
}

func (c Config) GetBackoffJitter(ctx context.Context) float64 {
jitter, err := strconv.ParseFloat(c.BackoffJitter, 64)
if err != nil {
logger.Warnf(ctx, "Failed to parse backoff jitter [%v]. Error: %v", c.BackoffJitter, err)
return 0.1
}

return jitter
}

// GetConfig gets loaded config for Discovery
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand All @@ -67,28 +87,28 @@ func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, authOpt .
switch catalogConfig.Type {
case CacheServiceType:
return cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.MaxRetries,
catalogConfig.MaxPerRetryTimeout.Duration, catalogConfig.BackOffScalar, catalogConfig.InlineCache,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries),
catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache,
catalogConfig.DefaultServiceConfig, authOpt...)
case FallbackType:
cacheClient, err := cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.MaxRetries,
catalogConfig.MaxPerRetryTimeout.Duration, catalogConfig.BackOffScalar, catalogConfig.InlineCache,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries),
catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache,
catalogConfig.DefaultServiceConfig, authOpt...)
if err != nil {
return nil, err
}
catalogClient, err := datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
if err != nil {
return nil, err
}
return cacheservice.NewFallbackClient(cacheClient, catalogClient)
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down
Loading

0 comments on commit 83cd81b

Please sign in to comment.