From 9ac6eeb4fce5672653009a159336e0e08d26cf9b Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Tue, 28 Jul 2020 16:36:12 -0700 Subject: [PATCH] feat; catalog api update to return additional information for catalog (#105) * Updating Core catalog api to return more information to the engine * use latest * Go modules update * lint fixed * FlyteIDL updated v0.17.38 --- copilot/go.mod | 2 +- copilot/go.sum | 2 + go.mod | 2 +- go.sum | 8 ++++ go/tasks/pluginmachinery/catalog/client.go | 48 +++++++++++++++++-- .../pluginmachinery/catalog/mocks/client.go | 33 +++++++------ .../catalog/reader_processor.go | 13 +++-- .../catalog/writer_processor.go | 13 +++-- 8 files changed, 95 insertions(+), 26 deletions(-) diff --git a/copilot/go.mod b/copilot/go.mod index 5e85990a3c..8e045082fa 100644 --- a/copilot/go.mod +++ b/copilot/go.mod @@ -7,7 +7,7 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.4.2 github.com/imdario/mergo v0.3.9 // indirect - github.com/lyft/flyteidl v0.17.32 + github.com/lyft/flyteidl v0.17.38 github.com/lyft/flyteplugins v0.3.29 github.com/lyft/flytestdlib v0.3.9 github.com/mitchellh/go-ps v1.0.0 diff --git a/copilot/go.sum b/copilot/go.sum index 6b6d36e828..0198b21bf3 100644 --- a/copilot/go.sum +++ b/copilot/go.sum @@ -283,6 +283,8 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0Au github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ= github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U= github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg= +github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.29 h1:NN88yXv6sTouMVwQEgbP0A6k+uznGr00ZcKP6ZFUPrU= github.com/lyft/flyteplugins v0.3.29/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/go.mod b/go.mod index 741077a22c..64e34fa0d5 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/kubeflow/pytorch-operator v0.6.0 github.com/kubeflow/tf-operator v0.5.3 - github.com/lyft/flyteidl v0.17.32 + github.com/lyft/flyteidl v0.17.38 github.com/lyft/flytestdlib v0.3.9 github.com/magiconair/properties v1.8.1 github.com/mitchellh/go-ps v1.0.0 diff --git a/go.sum b/go.sum index 4c11fec80a..7c109c80b9 100644 --- a/go.sum +++ b/go.sum @@ -312,6 +312,14 @@ github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/ github.com/lyft/flyteidl v0.17.29/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U= github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa h1:E+kaqlpuvc2h9m4Fw7yPQjOGjx0Hi7XOPT8KI55Im8A= +github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg= +github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.37 h1:2fyymCrLZWkdolznu91RGAhLmnqcFBC/MzOZ7mNgOTE= +github.com/lyft/flyteidl v0.17.37/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg= +github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= diff --git a/go/tasks/pluginmachinery/catalog/client.go b/go/tasks/pluginmachinery/catalog/client.go index 533491c27f..e58ee6501d 100644 --- a/go/tasks/pluginmachinery/catalog/client.go +++ b/go/tasks/pluginmachinery/catalog/client.go @@ -5,7 +5,7 @@ import ( "fmt" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + grpcStatus "google.golang.org/grpc/status" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -33,13 +33,53 @@ func (k Key) String() string { return fmt.Sprintf("%v:%v", k.Identifier, k.CacheVersion) } +// Indicates that status of the query to Catalog. This can be returned for both Get and Put calls +type Status struct { + cacheStatus core.CatalogCacheStatus + metadata *core.CatalogMetadata +} + +func (s Status) GetCacheStatus() core.CatalogCacheStatus { + return s.cacheStatus +} + +func (s Status) GetMetadata() *core.CatalogMetadata { + return s.metadata +} + +func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status { + return Status{cacheStatus: cacheStatus, metadata: md} +} + +// Indicates the Entry in Catalog that was populated +type Entry struct { + outputs io.OutputReader + status Status +} + +func (e Entry) GetOutputs() io.OutputReader { + return e.outputs +} + +func (e Entry) GetStatus() Status { + return e.status +} + +func NewFailedCatalogEntry(status Status) Entry { + return Entry{status: status} +} + +func NewCatalogEntry(outputs io.OutputReader, status Status) Entry { + return Entry{outputs: outputs, status: status} +} + // Default Catalog client that allows memoization and indexing of intermediate data in Flyte type Client interface { - Get(ctx context.Context, key Key) (io.OutputReader, error) - Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) error + Get(ctx context.Context, key Key) (Entry, error) + Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error) } func IsNotFound(err error) bool { - taskStatus, ok := status.FromError(err) + taskStatus, ok := grpcStatus.FromError(err) return ok && taskStatus.Code() == codes.NotFound } diff --git a/go/tasks/pluginmachinery/catalog/mocks/client.go b/go/tasks/pluginmachinery/catalog/mocks/client.go index d3bedd110d..007a4723cf 100644 --- a/go/tasks/pluginmachinery/catalog/mocks/client.go +++ b/go/tasks/pluginmachinery/catalog/mocks/client.go @@ -21,7 +21,7 @@ type Client_Get struct { *mock.Call } -func (_m Client_Get) Return(_a0 io.OutputReader, _a1 error) *Client_Get { +func (_m Client_Get) Return(_a0 catalog.Entry, _a1 error) *Client_Get { return &Client_Get{Call: _m.Call.Return(_a0, _a1)} } @@ -36,16 +36,14 @@ func (_m *Client) OnGetMatch(matchers ...interface{}) *Client_Get { } // Get provides a mock function with given fields: ctx, key -func (_m *Client) Get(ctx context.Context, key catalog.Key) (io.OutputReader, error) { +func (_m *Client) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error) { ret := _m.Called(ctx, key) - var r0 io.OutputReader - if rf, ok := ret.Get(0).(func(context.Context, catalog.Key) io.OutputReader); ok { + var r0 catalog.Entry + if rf, ok := ret.Get(0).(func(context.Context, catalog.Key) catalog.Entry); ok { r0 = rf(ctx, key) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(io.OutputReader) - } + r0 = ret.Get(0).(catalog.Entry) } var r1 error @@ -62,8 +60,8 @@ type Client_Put struct { *mock.Call } -func (_m Client_Put) Return(_a0 error) *Client_Put { - return &Client_Put{Call: _m.Call.Return(_a0)} +func (_m Client_Put) Return(_a0 catalog.Status, _a1 error) *Client_Put { + return &Client_Put{Call: _m.Call.Return(_a0, _a1)} } func (_m *Client) OnPut(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) *Client_Put { @@ -77,15 +75,22 @@ func (_m *Client) OnPutMatch(matchers ...interface{}) *Client_Put { } // Put provides a mock function with given fields: ctx, key, reader, metadata -func (_m *Client) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) error { +func (_m *Client) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) { ret := _m.Called(ctx, key, reader, metadata) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, catalog.Key, io.OutputReader, catalog.Metadata) error); ok { + var r0 catalog.Status + if rf, ok := ret.Get(0).(func(context.Context, catalog.Key, io.OutputReader, catalog.Metadata) catalog.Status); ok { r0 = rf(ctx, key, reader, metadata) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(catalog.Status) } - return r0 + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, catalog.Key, io.OutputReader, catalog.Metadata) error); ok { + r1 = rf(ctx, key, reader, metadata) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } diff --git a/go/tasks/pluginmachinery/catalog/reader_processor.go b/go/tasks/pluginmachinery/catalog/reader_processor.go index 0875df7dd4..3fa81cd1e4 100644 --- a/go/tasks/pluginmachinery/catalog/reader_processor.go +++ b/go/tasks/pluginmachinery/catalog/reader_processor.go @@ -5,11 +5,14 @@ import ( "fmt" "reflect" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flyteplugins/go/tasks/errors" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" ) @@ -57,14 +60,18 @@ func (p ReaderProcessor) Process(ctx context.Context, workItem workqueue.WorkIte return workqueue.WorkStatusFailed, err } - if op == nil { + if op.status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_LOOKUP_FAILURE { + return workqueue.WorkStatusFailed, errors.Errorf(errors.DownstreamSystemError, "failed to lookup cache") + } + + if op.status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_MISS || op.GetOutputs() == nil { wi.cached = false return workqueue.WorkStatusSucceeded, nil } // TODO: Check task interface, if it has outputs but literalmap is empty (or not matching output), error. logger.Debugf(ctx, "Persisting output to %v", wi.outputsWriter.GetOutputPath()) - err = wi.outputsWriter.Put(ctx, op) + err = wi.outputsWriter.Put(ctx, op.GetOutputs()) if err != nil { err = errors.Wrapf("CausedBy", err, "Failed to persist cached output for Key: %v.", wi.key) logger.Warnf(ctx, "Cache write to output writer failed: %v", err) diff --git a/go/tasks/pluginmachinery/catalog/writer_processor.go b/go/tasks/pluginmachinery/catalog/writer_processor.go index aa9a1e4df9..80dc46e4fc 100644 --- a/go/tasks/pluginmachinery/catalog/writer_processor.go +++ b/go/tasks/pluginmachinery/catalog/writer_processor.go @@ -5,10 +5,12 @@ import ( "fmt" "reflect" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/logger" + "github.com/lyft/flyteplugins/go/tasks/errors" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue" - "github.com/lyft/flytestdlib/logger" ) type WriterWorkItem struct { @@ -36,7 +38,7 @@ func (p writerProcessor) Process(ctx context.Context, workItem workqueue.WorkIte return workqueue.WorkStatusNotDone, fmt.Errorf("wrong work item type. Received: %v", reflect.TypeOf(workItem)) } - err := p.catalogClient.Put(ctx, wi.key, wi.data, wi.metadata) + status, err := p.catalogClient.Put(ctx, wi.key, wi.data, wi.metadata) if err != nil { logger.Errorf(ctx, "Error putting to catalog [%s]", err) return workqueue.WorkStatusNotDone, errors.Wrapf(errors.DownstreamSystemError, err, @@ -44,8 +46,13 @@ func (p writerProcessor) Process(ctx context.Context, workItem workqueue.WorkIte wi.key.Identifier, wi.key.CacheVersion) } - logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key) + if status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_PUT_FAILURE { + return workqueue.WorkStatusNotDone, errors.Errorf(errors.DownstreamSystemError, + "Error writing to catalog, key id [%v] cache version [%v]", + wi.key.Identifier, wi.key.CacheVersion) + } + logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key) return workqueue.WorkStatusSucceeded, nil }