Skip to content

Commit

Permalink
Feature/add pg client for caching (#170)
Browse files Browse the repository at this point in the history
* update cache service data model for relational store

Signed-off-by: Paul Dittamo <[email protected]>

* add placeholder migrations to successfully deploy cacheservice

Signed-off-by: Paul Dittamo <[email protected]>

* clean up - add sample cache service config

Signed-off-by: Paul Dittamo <[email protected]>

* add persistent flags for cache service

* set up postgres cache repo

Signed-off-by: Paul Dittamo <[email protected]>

* clean up

Signed-off-by: Paul Dittamo <[email protected]>

* add postgres clients for cache service

Signed-off-by: Paul Dittamo <[email protected]>

* unit test

Signed-off-by: Paul Dittamo <[email protected]>

* don't setup shared utils for mock pg db

Signed-off-by: Paul Dittamo <[email protected]>

* tidy

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup

Signed-off-by: Paul Dittamo <[email protected]>

* test

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* mod tidy

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* tidy

Signed-off-by: Paul Dittamo <[email protected]>

* tidy

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Apr 4, 2024
1 parent db4b8b8 commit a688479
Show file tree
Hide file tree
Showing 44 changed files with 1,336 additions and 536 deletions.
9 changes: 7 additions & 2 deletions cacheservice/cmd/entrypoints/migrate.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package entrypoints

import (
"context"

"github.com/spf13/cobra"

"github.com/flyteorg/flyte/cacheservice/pkg/repositories"
)

var parentMigrateCmd = &cobra.Command{
Use: "migrate",
Short: "This command controls migration behavior for the Flyte cacheservice database. Please choose a subcommand.",
}

// This runs all the migrations. This is a placeholder for now as cache service does not have any migrations
// This runs all the migrations for sql databases
var migrateCmd = &cobra.Command{
Use: "run",
Short: "This command will run all the migrations for the database",
RunE: func(cmd *cobra.Command, args []string) error {
return nil
ctx := context.Background()
return repositories.Migrate(ctx)
},
}

Expand Down
15 changes: 15 additions & 0 deletions cacheservice/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,24 @@ module github.com/flyteorg/flyte/cacheservice
go 1.21

require (
github.com/Selvatico/go-mocket v1.0.7
github.com/aws/aws-sdk-go-v2/config v1.26.6
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.12.17
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/glog v1.1.2
github.com/golang/protobuf v1.5.3
github.com/jackc/pgconn v1.14.1
github.com/mitchellh/mapstructure v1.5.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.22.0
google.golang.org/grpc v1.60.1
gorm.io/driver/postgres v1.5.3
gorm.io/gorm v1.25.4
)

require (
Expand All @@ -34,8 +38,19 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.2 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgx/v5 v5.4.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
golang.org/x/sync v0.4.0 // indirect
gorm.io/driver/sqlite v1.5.4 // indirect
)

require (
Expand Down
109 changes: 109 additions & 0 deletions cacheservice/go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion cacheservice/pkg/manager/impl/cache_data_store_test.go

This file was deleted.

20 changes: 11 additions & 9 deletions cacheservice/pkg/manager/impl/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import (
"github.com/flyteorg/flyte/cacheservice/pkg/errors"
"github.com/flyteorg/flyte/cacheservice/pkg/manager/impl/validators"
"github.com/flyteorg/flyte/cacheservice/pkg/manager/interfaces"
"github.com/flyteorg/flyte/cacheservice/repositories/models"
"github.com/flyteorg/flyte/cacheservice/repositories/transformers"
repoInterfaces "github.com/flyteorg/flyte/cacheservice/pkg/repositories/interfaces"
"github.com/flyteorg/flyte/cacheservice/pkg/repositories/models"
"github.com/flyteorg/flyte/cacheservice/pkg/repositories/transformers"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/cacheservice"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
Expand Down Expand Up @@ -49,8 +50,8 @@ type cacheMetrics struct {

type cacheManager struct {
outputStore interfaces.CacheOutputBlobStore
dataStore interfaces.CacheDataStoreClient
reservationStore interfaces.ReservationDataStoreClient
dataStore repoInterfaces.CachedOutputRepo
reservationStore repoInterfaces.ReservationRepo
systemMetrics cacheMetrics
maxInlineSizeBytes int64
heartbeatGracePeriodMultiplier time.Duration
Expand Down Expand Up @@ -111,6 +112,7 @@ func (m *cacheManager) Put(ctx context.Context, request *cacheservice.PutCacheRe
return nil, errors.NewCacheServiceErrorf(codes.Internal, "Failed to create cached output model, err: %v", err)
}

// TODO - @pvditt - can do this in single transaction w/ postgres client - move logic to client (still need to delete blob)
cachedOutput, err := m.dataStore.Get(ctx, request.Key)
var notFound bool
if err != nil {
Expand Down Expand Up @@ -208,7 +210,7 @@ func (m *cacheManager) GetOrExtendReservation(ctx context.Context, request *cach
heartbeatInterval = request.GetHeartbeatInterval().AsDuration()
}

newReservation := &models.Reservation{
newReservation := &models.CacheReservation{
Key: resKey,
OwnerID: request.OwnerId,
ExpiresAt: now.Add(heartbeatInterval * m.heartbeatGracePeriodMultiplier),
Expand All @@ -219,7 +221,7 @@ func (m *cacheManager) GetOrExtendReservation(ctx context.Context, request *cach
if reservationModel.ExpiresAt.Before(now) || reservationModel.OwnerID == request.OwnerId {
storeError = m.reservationStore.Update(ctx, newReservation, now)
} else {
logger.Debugf(ctx, "Reservation: %+v is held by %s", reservationModel.Key, reservationModel.OwnerID)
logger.Debugf(ctx, "CacheReservation: %+v is held by %s", reservationModel.Key, reservationModel.OwnerID)
reservation := transformers.FromReservationModel(ctx, reservationModel)
return &cacheservice.GetOrExtendReservationResponse{Reservation: reservation}, nil
}
Expand All @@ -229,7 +231,7 @@ func (m *cacheManager) GetOrExtendReservation(ctx context.Context, request *cach

if storeError != nil {
if status.Code(storeError) == codes.AlreadyExists {
logger.Debugf(ctx, "Reservation: %+v already exists", newReservation.Key)
logger.Debugf(ctx, "CacheReservation: %+v already exists", newReservation.Key)
newReservation, err = m.reservationStore.Get(ctx, resKey)
if err != nil {
logger.Errorf(ctx, "Failed to Get reservation in reservation store, err: %v", err)
Expand Down Expand Up @@ -265,7 +267,7 @@ func (m *cacheManager) ReleaseReservation(ctx context.Context, request *cacheser
err = m.reservationStore.Delete(ctx, resKey, request.OwnerId)
if err != nil {
if status.Code(err) == codes.NotFound {
logger.Debugf(ctx, "Reservation with key %v and owner %v not found", request.Key, request.OwnerId)
logger.Debugf(ctx, "CacheReservation with key %v and owner %v not found", request.Key, request.OwnerId)
m.systemMetrics.notFoundCounter.Inc(ctx)
return &cacheservice.ReleaseReservationResponse{}, nil
}
Expand All @@ -278,7 +280,7 @@ func (m *cacheManager) ReleaseReservation(ctx context.Context, request *cacheser
return &cacheservice.ReleaseReservationResponse{}, nil
}

func NewCacheManager(outputStore interfaces.CacheOutputBlobStore, dataStore interfaces.CacheDataStoreClient, reservationStore interfaces.ReservationDataStoreClient, maxInlineSizeBytes int64, cacheScope promutils.Scope,
func NewCacheManager(outputStore interfaces.CacheOutputBlobStore, dataStore repoInterfaces.CachedOutputRepo, reservationStore repoInterfaces.ReservationRepo, maxInlineSizeBytes int64, cacheScope promutils.Scope,
heartbeatGracePeriodMultiplier time.Duration, maxHeartbeatInterval time.Duration) interfaces.CacheManager {
cacheMetrics := cacheMetrics{
scope: cacheScope,
Expand Down
35 changes: 18 additions & 17 deletions cacheservice/pkg/manager/impl/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"github.com/flyteorg/flyte/cacheservice/pkg/errors"
"github.com/flyteorg/flyte/cacheservice/pkg/manager/interfaces"
"github.com/flyteorg/flyte/cacheservice/pkg/manager/mocks"
"github.com/flyteorg/flyte/cacheservice/repositories/models"
repoMocks "github.com/flyteorg/flyte/cacheservice/pkg/repositories/mocks"
"github.com/flyteorg/flyte/cacheservice/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteidl/clients/go/coreutils"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/cacheservice"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -91,15 +92,15 @@ func TestCacheManager_Get(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockDataStore := &mocks.CacheDataStoreClient{}
mockDataStore := &repoMocks.CachedOutputRepo{}
mockDataStore.OnGetMatch(
ctx,
mock.MatchedBy(func(o string) bool {
assert.EqualValues(t, sampleKey, o)
return true
})).Return(tc.mockDataStoreReturn, tc.mockDataStoreError)

m := NewCacheManager(&mocks.CacheOutputBlobStore{}, mockDataStore, &mocks.ReservationDataStoreClient{}, 1024, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))
m := NewCacheManager(&mocks.CacheOutputBlobStore{}, mockDataStore, &repoMocks.ReservationRepo{}, 1024, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))

response, err := m.Get(ctx, tc.mockRequest)

Expand Down Expand Up @@ -373,7 +374,7 @@ func TestCacheManager_Put(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
mockDataStore := &mocks.CacheDataStoreClient{}
mockDataStore := &repoMocks.CachedOutputRepo{}
mockDataStore.OnGetMatch(
ctx,
mock.MatchedBy(matchKeyFunc),
Expand All @@ -395,7 +396,7 @@ func TestCacheManager_Put(t *testing.T) {
mock.MatchedBy(tc.outputDeleteMatchOutputFunc),
).Return(tc.outputDeleteError)

m := NewCacheManager(mockOutputStore, mockDataStore, &mocks.ReservationDataStoreClient{}, tc.maxSizeBytes, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))
m := NewCacheManager(mockOutputStore, mockDataStore, &repoMocks.ReservationRepo{}, tc.maxSizeBytes, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))

_, err := m.Put(ctx, tc.mockRequest)

Expand Down Expand Up @@ -431,9 +432,9 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
expectError bool
expectedErrorStatusCode codes.Code
deleteError error
getReturn *models.Reservation
getReturn *models.CacheReservation
getError error
getReturn1 *models.Reservation
getReturn1 *models.CacheReservation
getError1 error
createError error
updateError error
Expand Down Expand Up @@ -499,7 +500,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
},
getReturn: nil,
getError: errors.NewNotFoundError("reservation", sampleKey),
getReturn1: &models.Reservation{
getReturn1: &models.CacheReservation{
Key: sampleKey,
OwnerID: sampleOwner1,
ExpiresAt: now.Add(time.Hour),
Expand All @@ -517,7 +518,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
OwnerId: sampleOwner,
HeartbeatInterval: sampleHeartBeatInterval,
},
getReturn: &models.Reservation{
getReturn: &models.CacheReservation{
Key: sampleKey,
OwnerID: sampleOwner1,
ExpiresAt: now.Add(-time.Hour),
Expand All @@ -535,7 +536,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
OwnerId: sampleOwner,
HeartbeatInterval: sampleHeartBeatInterval,
},
getReturn: &models.Reservation{
getReturn: &models.CacheReservation{
Key: sampleKey,
OwnerID: sampleOwner,
ExpiresAt: now.Add(time.Hour),
Expand All @@ -553,7 +554,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
OwnerId: sampleOwner,
HeartbeatInterval: sampleHeartBeatInterval,
},
getReturn: &models.Reservation{
getReturn: &models.CacheReservation{
Key: sampleKey,
OwnerID: sampleOwner1,
ExpiresAt: now.Add(time.Hour),
Expand All @@ -570,7 +571,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
for _, tc := range testCases {
requestKey := fmt.Sprintf("%s:%s", "reservation", tc.mockRequest.Key)
t.Run(tc.name, func(t *testing.T) {
mockReservationStoreClient := &mocks.ReservationDataStoreClient{}
mockReservationStoreClient := &repoMocks.ReservationRepo{}
mockReservationStoreClient.OnGetMatch(
ctx,
mock.MatchedBy(func(o string) bool {
Expand All @@ -585,7 +586,7 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
})).Return(tc.getReturn1, tc.getError1)
mockReservationStoreClient.OnCreateMatch(
ctx,
mock.MatchedBy(func(reservation *models.Reservation) bool {
mock.MatchedBy(func(reservation *models.CacheReservation) bool {
assert.Equal(t, requestKey, reservation.Key)
assert.Equal(t, tc.expectedExpiresAtCall, reservation.ExpiresAt)
return true
Expand All @@ -594,15 +595,15 @@ func TestCacheManager_GetOrExtendReservation(t *testing.T) {
).Return(tc.createError)
mockReservationStoreClient.OnUpdateMatch(
ctx,
mock.MatchedBy(func(reservation *models.Reservation) bool {
mock.MatchedBy(func(reservation *models.CacheReservation) bool {
assert.Equal(t, requestKey, reservation.Key)
assert.Equal(t, tc.expectedExpiresAtCall, reservation.ExpiresAt)
return true
}),
mock.Anything,
).Return(tc.updateError)

m := NewCacheManager(&mocks.CacheOutputBlobStore{}, &mocks.CacheDataStoreClient{}, mockReservationStoreClient, 0, mockScope.NewTestScope(), heartbeatGracePeriodMultiplier, maxHeartBeatInterval)
m := NewCacheManager(&mocks.CacheOutputBlobStore{}, &repoMocks.CachedOutputRepo{}, mockReservationStoreClient, 0, mockScope.NewTestScope(), heartbeatGracePeriodMultiplier, maxHeartBeatInterval)

reservation, err := m.GetOrExtendReservation(ctx, tc.mockRequest, now)
if tc.expectError {
Expand Down Expand Up @@ -680,7 +681,7 @@ func TestCacheManager_ReleaseReservation(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
requestKey := fmt.Sprintf("%s:%s", "reservation", tc.mockRequest.Key)

mockReservationStoreClient := &mocks.ReservationDataStoreClient{}
mockReservationStoreClient := &repoMocks.ReservationRepo{}
mockReservationStoreClient.OnDeleteMatch(
ctx,
mock.MatchedBy(func(key string) bool {
Expand All @@ -692,7 +693,7 @@ func TestCacheManager_ReleaseReservation(t *testing.T) {
}),
).Return(tc.deleteError)

m := NewCacheManager(&mocks.CacheOutputBlobStore{}, &mocks.CacheDataStoreClient{}, mockReservationStoreClient, 0, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))
m := NewCacheManager(&mocks.CacheOutputBlobStore{}, &repoMocks.CachedOutputRepo{}, mockReservationStoreClient, 0, mockScope.NewTestScope(), time.Duration(1), time.Duration(1))

reservation, err := m.ReleaseReservation(ctx, tc.mockRequest)
if tc.expectError {
Expand Down
2 changes: 1 addition & 1 deletion cacheservice/pkg/manager/impl/cache_output_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *cacheOutputBlobStore) Delete(ctx context.Context, uri string) error {
return nil
}

func NewCacheOutputStore(store *storage.DataStore, storagePrefix storage.DataReference) interfaces.CacheOutputBlobStore {
func NewCacheOutputBlobStore(store *storage.DataStore, storagePrefix storage.DataReference) interfaces.CacheOutputBlobStore {
return &cacheOutputBlobStore{
store: store,
storagePrefix: storagePrefix,
Expand Down
1 change: 0 additions & 1 deletion cacheservice/pkg/manager/impl/reservation_store_test.go

This file was deleted.

14 changes: 0 additions & 14 deletions cacheservice/pkg/manager/interfaces/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/flyteorg/flyte/cacheservice/repositories/models"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/cacheservice"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)
Expand All @@ -23,16 +22,3 @@ type CacheOutputBlobStore interface {
Create(ctx context.Context, key string, output *core.LiteralMap) (string, error)
Delete(ctx context.Context, uri string) error
}

type CacheDataStoreClient interface {
Get(ctx context.Context, key string) (*models.CachedOutput, error)
Put(ctx context.Context, key string, cachedOutput *models.CachedOutput) error
Delete(ctx context.Context, key string) error
}

type ReservationDataStoreClient interface {
Create(ctx context.Context, reservation *models.Reservation, now time.Time) error
Update(ctx context.Context, reservation *models.Reservation, now time.Time) error
Get(ctx context.Context, key string) (*models.Reservation, error)
Delete(ctx context.Context, key string, ownerID string) error
}
Loading

0 comments on commit a688479

Please sign in to comment.