Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #73 from lyft/bump-datacatalog
Browse files Browse the repository at this point in the history
DataCatalog Artifacts over MaxCacheAge will skip cache retrieval
  • Loading branch information
chanadian authored Feb 20, 2020
2 parents 6e5cd97 + 515e119 commit 04b5a13
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 17 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.1.2
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.2
github.com/lyft/flyteplugins v0.3.2
github.com/lyft/flytestdlib v0.3.2
Expand Down Expand Up @@ -48,7 +48,6 @@ require (
replace (
github.com/GoogleCloudPlatform/spark-on-k8s-operator => github.com/lyft/spark-on-k8s-operator v0.1.3
github.com/googleapis/gnostic => github.com/googleapis/gnostic v0.3.1
github.com/lyft/datacatalog => github.com/lyft/datacatalog v0.1.1
gopkg.in/fsnotify.v1 => github.com/fsnotify/fsnotify v1.4.7
k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0
k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f
Expand Down
44 changes: 44 additions & 0 deletions go.sum

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions pkg/controller/nodes/task/catalog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ const (
)

type Config struct {
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
Type DiscoveryType `json:"type" pflag:"\"noop\", Catalog Implementation to use"`
Endpoint string `json:"endpoint" pflag:"\"\", Endpoint for catalog service"`
Insecure bool `json:"insecure" pflag:"false, Use insecure grpc connection"`
MaxCacheAge config.Duration `json:"max-cache-age" pflag:", Cache entries past this age will incur cache miss. 0 means cache never expires"`
}

// Gets loaded config for Discovery
Expand All @@ -45,7 +46,7 @@ func NewCatalogClient(ctx context.Context) (catalog.Client, error) {

switch catalogConfig.Type {
case DataCatalogType:
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure)
return datacatalog.NewDataCatalog(ctx, catalogConfig.Endpoint, catalogConfig.Insecure, catalogConfig.MaxCacheAge.Duration)
case NoOpDiscoveryType, "":
return NOOPCatalog{}, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/task/catalog/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions pkg/controller/nodes/task/catalog/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 22 additions & 3 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils"
"github.com/pkg/errors"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flytestdlib/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand All @@ -32,7 +33,8 @@ var (

// This is the client that caches task executions to DataCatalog service.
type CatalogClient struct {
client datacatalog.DataCatalogClient
client datacatalog.DataCatalogClient
maxCacheAge time.Duration
}

// Helper method to retrieve a dataset that is associated with the task
Expand Down Expand Up @@ -69,6 +71,22 @@ func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, da
return nil, err
}

// check artifact's age if the configuration specifies a max age
if m.maxCacheAge > time.Duration(0) {
artifact := response.Artifact
createdAt, err := ptypes.Timestamp(artifact.CreatedAt)
if err != nil {
logger.Errorf(ctx, "DataCatalog Artifact has invalid createdAt %+v, err: %+v", artifact.CreatedAt, err)
return nil, err
}

if time.Since(createdAt) > m.maxCacheAge {
logger.Warningf(ctx, "Expired Cached Artifact %v created on %v, older than max age %v",
artifact.Id, createdAt.String(), m.maxCacheAge)
return nil, status.Error(codes.NotFound, "Artifact over age limit")
}
}

return response.Artifact, nil
}

Expand Down Expand Up @@ -254,7 +272,7 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
}

// Create a new Datacatalog client for task execution caching
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool) (*CatalogClient, error) {
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration) (*CatalogClient, error) {
var opts []grpc.DialOption

grpcOptions := []grpcRetry.CallOption{
Expand Down Expand Up @@ -288,6 +306,7 @@ func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection boo
client := datacatalog.NewDataCatalogClient(clientConn)

return &CatalogClient{
client: client,
client: client,
maxCacheAge: maxCacheAge,
}, nil
}
111 changes: 107 additions & 4 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"time"

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/catalog/datacatalog/mocks"
)

Expand Down Expand Up @@ -90,6 +93,11 @@ func TestCatalog_Get(t *testing.T) {

ctx := context.Background()

sampleArtifactData := &datacatalog.ArtifactData{
Name: "test",
Value: newStringLiteral("output1-stringval"),
}

t.Run("No results, no Dataset", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(newStringLiteral("output"), nil, nil)
Expand Down Expand Up @@ -169,10 +177,6 @@ func TestCatalog_Get(t *testing.T) {
}),
).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil)

sampleArtifactData := &datacatalog.ArtifactData{
Name: "test",
Value: newStringLiteral("output1-stringval"),
}
sampleArtifact := &datacatalog.Artifact{
Id: "test-artifact",
Dataset: sampleDataSet.Id,
Expand All @@ -194,6 +198,102 @@ func TestCatalog_Get(t *testing.T) {
assert.NotNil(t, resp)
})

t.Run("Found expired artifact", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
maxCacheAge: time.Hour,
}

sampleDataSet := &datacatalog.Dataset{
Id: datasetID,
}

mockClient.On("GetDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool {
assert.EqualValues(t, datasetID, o.Dataset)
return true
}),
).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil)
createdAt, err := ptypes.TimestampProto(time.Now().Add(time.Minute * -61))
assert.NoError(t, err)

sampleArtifact := &datacatalog.Artifact{
Id: "test-artifact",
Dataset: sampleDataSet.Id,
Data: []*datacatalog.ArtifactData{sampleArtifactData},
CreatedAt: createdAt,
}
mockClient.On("GetArtifact",
ctx,
mock.MatchedBy(func(o *datacatalog.GetArtifactRequest) bool {
assert.EqualValues(t, datasetID, o.Dataset)
assert.Equal(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.GetTagName())
return true
}),
).Return(&datacatalog.GetArtifactResponse{Artifact: sampleArtifact}, nil)

newKey := sampleKey
newKey.InputReader = ir
resp, err := catalogClient.Get(ctx, newKey)
assert.Error(t, err)
assert.Nil(t, resp)

getStatus, ok := status.FromError(err)
assert.True(t, ok)
assert.Equal(t, getStatus.Code(), codes.NotFound)
})

t.Run("Found non-expired artifact", func(t *testing.T) {
ir := &mocks2.InputReader{}
ir.On("Get", mock.Anything).Return(sampleParameters, nil, nil)

mockClient := &mocks.DataCatalogClient{}
catalogClient := &CatalogClient{
client: mockClient,
maxCacheAge: time.Hour,
}

sampleDataSet := &datacatalog.Dataset{
Id: datasetID,
}

mockClient.On("GetDataset",
ctx,
mock.MatchedBy(func(o *datacatalog.GetDatasetRequest) bool {
assert.EqualValues(t, datasetID, o.Dataset)
return true
}),
).Return(&datacatalog.GetDatasetResponse{Dataset: sampleDataSet}, nil)
createdAt, err := ptypes.TimestampProto(time.Now().Add(time.Minute * -59))
assert.NoError(t, err)

sampleArtifact := &datacatalog.Artifact{
Id: "test-artifact",
Dataset: sampleDataSet.Id,
Data: []*datacatalog.ArtifactData{sampleArtifactData},
CreatedAt: createdAt,
}
mockClient.On("GetArtifact",
ctx,
mock.MatchedBy(func(o *datacatalog.GetArtifactRequest) bool {
assert.EqualValues(t, datasetID, o.Dataset)
assert.Equal(t, "flyte_cached-BE6CZsMk6N3ExR_4X9EuwBgj2Jh2UwasXK3a_pM9xlY", o.GetTagName())
return true
}),
).Return(&datacatalog.GetArtifactResponse{Artifact: sampleArtifact}, nil)

newKey := sampleKey
newKey.InputReader = ir
resp, err := catalogClient.Get(ctx, newKey)
assert.NoError(t, err)
assert.NotNil(t, resp)
})

t.Run("Found w/ tag no inputs or outputs", func(t *testing.T) {
mockClient := &mocks.DataCatalogClient{}
discovery := &CatalogClient{
Expand Down Expand Up @@ -231,9 +331,12 @@ func TestCatalog_Get(t *testing.T) {
}),
).Return(&datacatalog.GetArtifactResponse{Artifact: sampleArtifact}, nil)

assert.False(t, discovery.maxCacheAge > time.Duration(0))

resp, err := discovery.Get(ctx, noInputOutputKey)
assert.NoError(t, err)
assert.NotNil(t, resp)

v, e, err := resp.Read(ctx)
assert.NoError(t, err)
assert.Nil(t, e)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 04b5a13

Please sign in to comment.