Skip to content

Commit

Permalink
[Feature] reduce cache reservation calls (#283)
Browse files Browse the repository at this point in the history
* utilize LRU cache to reduce GetOrExtendCatalogReservation calls to control plane caching service

Signed-off-by: Paul Dittamo <[email protected]>

* tidy

Signed-off-by: Paul Dittamo <[email protected]>

* update test

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* revert mockery change

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* set heartbeat multiplier to 1

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored May 17, 2024
1 parent 30c3790 commit 06190b7
Show file tree
Hide file tree
Showing 14 changed files with 222 additions and 45 deletions.
9 changes: 9 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/catalog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type Key struct {
InputReader io.InputReader
}

type ReservationCache struct {
Timestamp time.Time
ReservationStatus core.CatalogReservation_Status
}

func (k Key) String() string {
return fmt.Sprintf("%v:%v", k.Identifier, k.CacheVersion)
}
Expand Down Expand Up @@ -139,6 +144,10 @@ type Client interface {
Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
// ReleaseReservation releases an acquired reservation for the given key and owner ID.
ReleaseReservation(ctx context.Context, key Key, ownerID string) error
// GetReservationCache checks the reservation cache for the given owner ID
GetReservationCache(ownerID string) ReservationCache
// UpdateReservationCache updates the reservation cache for the given owner ID
UpdateReservationCache(ownerID string, entry ReservationCache)
}

func IsNotFound(err error) bool {
Expand Down
37 changes: 37 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/googleapis/gax-go/v2 v2.12.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/hashicorp/golang-lru v0.5.4
github.com/imdario/mergo v0.3.13
github.com/magiconair/properties v1.8.6
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -99,7 +100,6 @@ require (
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
31 changes: 20 additions & 11 deletions flytepropeller/pkg/controller/nodes/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,37 +171,46 @@ func (n *nodeExecutor) CheckCatalogCache(ctx context.Context, nCtx interfaces.No
// cacheable and cache serializable. If the reservation already exists for this owner, the
// reservation is extended.
func (n *nodeExecutor) GetOrExtendCatalogReservation(ctx context.Context, nCtx interfaces.NodeExecutionContext,
cacheHandler interfaces.CacheableNodeHandler, heartbeatInterval time.Duration) (catalog.ReservationEntry, error) {
cacheHandler interfaces.CacheableNodeHandler, heartbeatInterval time.Duration) (core.CatalogReservation_Status, error) {

catalogKey, err := getCatalogKeyWithOverrides(ctx, nCtx, cacheHandler)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the catalogKey")
return core.CatalogReservation_RESERVATION_DISABLED, errors.Wrapf(err, "failed to initialize the catalogKey")
}

ownerID, err := computeCatalogReservationOwnerID(nCtx)
if err != nil {
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_DISABLED),
errors.Wrapf(err, "failed to initialize the cache reservation ownerID")
return core.CatalogReservation_RESERVATION_DISABLED, errors.Wrapf(err, "failed to initialize the cache reservation ownerID")
}

// TODO - pvditt - set multiplier via config ie (cfg.Multiplier - 1) * heartbeatInterval
evalInterval := heartbeatInterval
// Optimization to avoid checking the reservation status every loop
entry := n.cache.GetReservationCache(ownerID)
if !entry.Timestamp.IsZero() {
if time.Since(entry.Timestamp) < evalInterval {
return entry.ReservationStatus, nil
}
}

reservation, err := n.cache.GetOrExtendReservation(ctx, catalogKey, ownerID, heartbeatInterval)
if err != nil {
n.metrics.reservationGetFailureCount.Inc(ctx)
logger.Errorf(ctx, "Catalog Failure: reservation get or extend failed. err: %v", err.Error())
return catalog.NewReservationEntryStatus(core.CatalogReservation_RESERVATION_FAILURE), err
return core.CatalogReservation_RESERVATION_FAILURE, err
}

var status core.CatalogReservation_Status
var reservationStatus core.CatalogReservation_Status
if reservation.OwnerId == ownerID {
status = core.CatalogReservation_RESERVATION_ACQUIRED
reservationStatus = core.CatalogReservation_RESERVATION_ACQUIRED
} else {
status = core.CatalogReservation_RESERVATION_EXISTS
reservationStatus = core.CatalogReservation_RESERVATION_EXISTS
}

n.cache.UpdateReservationCache(ownerID, catalog.ReservationCache{Timestamp: time.Now(), ReservationStatus: reservationStatus})

n.metrics.reservationGetSuccessCount.Inc(ctx)
return catalog.NewReservationEntry(reservation.ExpiresAt.AsTime(),
reservation.HeartbeatInterval.AsDuration(), reservation.OwnerId, status), nil
return reservationStatus, nil
}

// ReleaseCatalogReservation attempts to release an artifact reservation if the task is cacheable
Expand Down
58 changes: 50 additions & 8 deletions flytepropeller/pkg/controller/nodes/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,20 +300,54 @@ func TestGetCatalogKeyWithOverrides(t *testing.T) {
}

func TestGetOrExtendCatalogReservation(t *testing.T) {

heartBeatInterval := time.Second * 30
tests := []struct {
name string
reservationOwnerID string
expectedReservationStatus core.CatalogReservation_Status
reservationCacheStatus catalog.ReservationCache
notExpectClientCall bool
}{
{
"Acquired",
"bar-foo-1-baz-0",
core.CatalogReservation_RESERVATION_ACQUIRED,
name: "Acquired",
reservationOwnerID: "bar-foo-1-baz-0",
expectedReservationStatus: core.CatalogReservation_RESERVATION_ACQUIRED,
reservationCacheStatus: catalog.ReservationCache{},
},
{
name: "Exists",
reservationOwnerID: "some-other-owner",
expectedReservationStatus: core.CatalogReservation_RESERVATION_EXISTS,
reservationCacheStatus: catalog.ReservationCache{},
},
{
name: "Cache hit - acquired",
reservationCacheStatus: catalog.ReservationCache{
Timestamp: time.Now(),
ReservationStatus: core.CatalogReservation_RESERVATION_ACQUIRED,
},
expectedReservationStatus: core.CatalogReservation_RESERVATION_ACQUIRED,
notExpectClientCall: true,
},
{
name: "Cache hit - exists",
reservationCacheStatus: catalog.ReservationCache{
Timestamp: time.Now(),
ReservationStatus: core.CatalogReservation_RESERVATION_EXISTS,
},
expectedReservationStatus: core.CatalogReservation_RESERVATION_EXISTS,
notExpectClientCall: true,
},
{
"Exists",
"some-other-owner",
core.CatalogReservation_RESERVATION_EXISTS,
name: "Cache expired",
reservationOwnerID: "bar-foo-1-baz-0",
reservationCacheStatus: catalog.ReservationCache{
Timestamp: time.Now().Add(-3 * heartBeatInterval),
ReservationStatus: core.CatalogReservation_RESERVATION_EXISTS,
},
expectedReservationStatus: core.CatalogReservation_RESERVATION_ACQUIRED,
notExpectClientCall: false,
},
}

Expand Down Expand Up @@ -343,6 +377,8 @@ func TestGetOrExtendCatalogReservation(t *testing.T) {
},
nil,
)
catalogClient.OnGetReservationCache(mock.Anything).Return(test.reservationCacheStatus)
catalogClient.On("UpdateReservationCache", mock.Anything, mock.Anything).Return()

nodeExecutor := &nodeExecutor{
cache: catalogClient,
Expand All @@ -351,11 +387,17 @@ func TestGetOrExtendCatalogReservation(t *testing.T) {
nCtx := setupCacheableNodeExecutionContext(nil, &core.TaskTemplate{})

// execute catalog cache check
reservationEntry, err := nodeExecutor.GetOrExtendCatalogReservation(context.TODO(), nCtx, cacheableHandler, time.Second*30)
reservationStatus, err := nodeExecutor.GetOrExtendCatalogReservation(context.TODO(), nCtx, cacheableHandler, heartBeatInterval)
assert.NoError(t, err)

// validate the result cache entry status
assert.Equal(t, test.expectedReservationStatus, reservationEntry.GetStatus())
assert.Equal(t, test.expectedReservationStatus, reservationStatus)

if test.notExpectClientCall {
catalogClient.AssertNotCalled(t, "GetOrExtendReservation", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
} else {
catalogClient.AssertCalled(t, "GetOrExtendReservation", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/protobuf/ptypes"
grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -37,6 +38,7 @@ type CacheClient struct {
store *storage.DataStore
maxCacheAge time.Duration
inlineCache bool
lruMap *lru.Cache
}

func (c *CacheClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*catalogIdl.Reservation, error) {
Expand Down Expand Up @@ -220,8 +222,21 @@ func (c *CacheClient) Put(ctx context.Context, key catalog.Key, reader io.Output
return c.put(ctx, key, reader, metadata, false)
}

func (c *CacheClient) GetReservationCache(ownerID string) catalog.ReservationCache {
if val, ok := c.lruMap.Get(ownerID); ok {
return val.(catalog.ReservationCache)
}

return catalog.ReservationCache{}
}

func (c *CacheClient) UpdateReservationCache(ownerID string, entry catalog.ReservationCache) {
c.lruMap.Add(ownerID, entry)
}

func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, endpoint string, insecureConnection bool, maxCacheAge time.Duration,
useAdminAuth bool, maxRetries uint, backoffScalar int, backoffJitter float64, inlineCache bool, defaultServiceConfig string, authOpt ...grpc.DialOption) (*CacheClient, error) {
useAdminAuth bool, maxRetries uint, backoffScalar int, backoffJitter float64, inlineCache bool, defaultServiceConfig string,
reservationMaxCacheSize int, authOpt ...grpc.DialOption) (*CacheClient, error) {
var opts []grpc.DialOption
if useAdminAuth && authOpt != nil {
opts = append(opts, authOpt...)
Expand Down Expand Up @@ -267,11 +282,18 @@ func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, endpoint
}
client := cacheservice.NewCacheServiceClient(clientConn)

var evictionFunction func(key interface{}, value interface{})
lruCache, err := lru.NewWithEvict(reservationMaxCacheSize, evictionFunction)
if err != nil {
return nil, err
}

return &CacheClient{
client: client,
store: dataStore,
maxCacheAge: maxCacheAge,
inlineCache: inlineCache,
lruMap: lruCache,
}, nil

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ type FallbackClient struct {
dataStore *storage.DataStore
}

func (c *FallbackClient) GetReservationCache(ownerID string) catalog.ReservationCache {
return c.cacheClient.GetReservationCache(ownerID)
}

func (c *FallbackClient) UpdateReservationCache(ownerID string, entry catalog.ReservationCache) {
c.cacheClient.UpdateReservationCache(ownerID, entry)
}

func (c *FallbackClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*catalogIdl.Reservation, error) {
return c.cacheClient.GetOrExtendReservation(ctx, key, ownerID, heartbeatInterval)
}
Expand Down
40 changes: 22 additions & 18 deletions flytepropeller/pkg/controller/nodes/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ const ConfigSectionKey = "catalog-cache"

var (
defaultConfig = Config{
Type: NoOpDiscoveryType,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
Type: NoOpDiscoveryType,
MaxRetries: 5,
BackoffScalar: 100,
BackoffJitter: "0.1",
ReservationMaxCacheSize: 10000,
}

configSection = config.MustRegisterSectionWithUpdates(ConfigSectionKey, &defaultConfig, func(ctx context.Context, newValue config.Config) {
Expand All @@ -48,16 +49,17 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
CacheEndpoint string `json:"cache-endpoint" pflag:"\"\", Endpoint for cache service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
UseAdminAuth bool `json:"use-admin-auth" pflag:"false, Use the same gRPC credentials option as the flyteadmin client"`
MaxRetries int `json:"max-retries" pflag:",The max number of retries for event recording."`
BackoffScalar int `json:"base-scalar" pflag:",The base/scalar backoff duration in milliseconds for event recording retries."`
BackoffJitter string `json:"backoff-jitter" pflag:",A string representation of a floating point number between 0 and 1 specifying the jitter factor for event recording retries."`
InlineCache bool `json:"inline-cache" pflag:"false, Attempt to use in-line cache"`
ReservationMaxCacheSize int `json:"reservation-cache-size" pflag:", The max size of the reservation cache"`

// Set the gRPC service config formatted as a json string https://github.com/grpc/grpc/blob/master/doc/service_config.md
// eg. {"loadBalancingConfig": [{"round_robin":{}}], "methodConfig": [{"name":[{"service": "foo", "method": "bar"}, {"service": "baz"}], "timeout": "1.000000001s"}]}
Expand Down Expand Up @@ -89,26 +91,28 @@ func NewCacheClient(ctx context.Context, dataStore *storage.DataStore, authOpt .
return cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries),
catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache,
catalogConfig.DefaultServiceConfig, authOpt...)
catalogConfig.DefaultServiceConfig, catalogConfig.ReservationMaxCacheSize, authOpt...)
case FallbackType:
cacheClient, err := cacheservice.NewCacheClient(ctx, dataStore, catalogConfig.CacheEndpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, uint(catalogConfig.MaxRetries),
catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), catalogConfig.InlineCache,
catalogConfig.DefaultServiceConfig, authOpt...)
catalogConfig.DefaultServiceConfig, catalogConfig.ReservationMaxCacheSize, authOpt...)
if err != nil {
return nil, err
}
catalogClient, err := datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx),
catalogConfig.ReservationMaxCacheSize, authOpt...)
if err != nil {
return nil, err
}
return cacheservice.NewFallbackClient(cacheClient, catalogClient, dataStore)
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure,
catalogConfig.MaxCacheAge.Duration, catalogConfig.UseAdminAuth, catalogConfig.DefaultServiceConfig,
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx), authOpt...)
uint(catalogConfig.MaxRetries), catalogConfig.BackoffScalar, catalogConfig.GetBackoffJitter(ctx),
catalogConfig.ReservationMaxCacheSize, authOpt...)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 06190b7

Please sign in to comment.