Skip to content

Commit

Permalink
feat; catalog api update to return additional information for catalog (
Browse files Browse the repository at this point in the history
…flyteorg#105)

* Updating Core catalog api to return more information to the engine

* use latest

* Go modules update

* lint fixed

* FlyteIDL updated v0.17.38
  • Loading branch information
Ketan Umare authored Jul 28, 2020
1 parent 603aff9 commit 2749f8f
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 26 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/copilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.4.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/lyft/flyteidl v0.17.32
github.com/lyft/flyteidl v0.17.38
github.com/lyft/flyteplugins v0.3.29
github.com/lyft/flytestdlib v0.3.9
github.com/mitchellh/go-ps v1.0.0
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/copilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0Au
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U=
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg=
github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.29 h1:NN88yXv6sTouMVwQEgbP0A6k+uznGr00ZcKP6ZFUPrU=
github.com/lyft/flyteplugins v0.3.29/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/kubeflow/pytorch-operator v0.6.0
github.com/kubeflow/tf-operator v0.5.3
github.com/lyft/flyteidl v0.17.32
github.com/lyft/flyteidl v0.17.38
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mitchellh/go-ps v1.0.0
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,14 @@ github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/
github.com/lyft/flyteidl v0.17.29/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.32 h1:Iio3gYjTyPhAiOMWJ/H/4YtfWIZm5KZSlWMULT1Ef6U=
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa h1:E+kaqlpuvc2h9m4Fw7yPQjOGjx0Hi7XOPT8KI55Im8A=
github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.37 h1:2fyymCrLZWkdolznu91RGAhLmnqcFBC/MzOZ7mNgOTE=
github.com/lyft/flyteidl v0.17.37/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg=
github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
48 changes: 44 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/catalog/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
grpcStatus "google.golang.org/grpc/status"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

Expand Down Expand Up @@ -33,13 +33,53 @@ func (k Key) String() string {
return fmt.Sprintf("%v:%v", k.Identifier, k.CacheVersion)
}

// Indicates that status of the query to Catalog. This can be returned for both Get and Put calls
type Status struct {
cacheStatus core.CatalogCacheStatus
metadata *core.CatalogMetadata
}

func (s Status) GetCacheStatus() core.CatalogCacheStatus {
return s.cacheStatus
}

func (s Status) GetMetadata() *core.CatalogMetadata {
return s.metadata
}

func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status {
return Status{cacheStatus: cacheStatus, metadata: md}
}

// Indicates the Entry in Catalog that was populated
type Entry struct {
outputs io.OutputReader
status Status
}

func (e Entry) GetOutputs() io.OutputReader {
return e.outputs
}

func (e Entry) GetStatus() Status {
return e.status
}

func NewFailedCatalogEntry(status Status) Entry {
return Entry{status: status}
}

func NewCatalogEntry(outputs io.OutputReader, status Status) Entry {
return Entry{outputs: outputs, status: status}
}

// Default Catalog client that allows memoization and indexing of intermediate data in Flyte
type Client interface {
Get(ctx context.Context, key Key) (io.OutputReader, error)
Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) error
Get(ctx context.Context, key Key) (Entry, error)
Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
}

func IsNotFound(err error) bool {
taskStatus, ok := status.FromError(err)
taskStatus, ok := grpcStatus.FromError(err)
return ok && taskStatus.Code() == codes.NotFound
}
33 changes: 19 additions & 14 deletions flyteplugins/go/tasks/pluginmachinery/catalog/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/catalog/reader_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"fmt"
"reflect"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/lyft/flyteplugins/go/tasks/errors"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue"
)

Expand Down Expand Up @@ -57,14 +60,18 @@ func (p ReaderProcessor) Process(ctx context.Context, workItem workqueue.WorkIte
return workqueue.WorkStatusFailed, err
}

if op == nil {
if op.status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_LOOKUP_FAILURE {
return workqueue.WorkStatusFailed, errors.Errorf(errors.DownstreamSystemError, "failed to lookup cache")
}

if op.status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_MISS || op.GetOutputs() == nil {
wi.cached = false
return workqueue.WorkStatusSucceeded, nil
}

// TODO: Check task interface, if it has outputs but literalmap is empty (or not matching output), error.
logger.Debugf(ctx, "Persisting output to %v", wi.outputsWriter.GetOutputPath())
err = wi.outputsWriter.Put(ctx, op)
err = wi.outputsWriter.Put(ctx, op.GetOutputs())
if err != nil {
err = errors.Wrapf("CausedBy", err, "Failed to persist cached output for Key: %v.", wi.key)
logger.Warnf(ctx, "Cache write to output writer failed: %v", err)
Expand Down
13 changes: 10 additions & 3 deletions flyteplugins/go/tasks/pluginmachinery/catalog/writer_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"fmt"
"reflect"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/errors"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/workqueue"
"github.com/lyft/flytestdlib/logger"
)

type WriterWorkItem struct {
Expand Down Expand Up @@ -36,16 +38,21 @@ func (p writerProcessor) Process(ctx context.Context, workItem workqueue.WorkIte
return workqueue.WorkStatusNotDone, fmt.Errorf("wrong work item type. Received: %v", reflect.TypeOf(workItem))
}

err := p.catalogClient.Put(ctx, wi.key, wi.data, wi.metadata)
status, err := p.catalogClient.Put(ctx, wi.key, wi.data, wi.metadata)
if err != nil {
logger.Errorf(ctx, "Error putting to catalog [%s]", err)
return workqueue.WorkStatusNotDone, errors.Wrapf(errors.DownstreamSystemError, err,
"Error writing to catalog, key id [%v] cache version [%v]",
wi.key.Identifier, wi.key.CacheVersion)
}

logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key)
if status.GetCacheStatus() == core.CatalogCacheStatus_CACHE_PUT_FAILURE {
return workqueue.WorkStatusNotDone, errors.Errorf(errors.DownstreamSystemError,
"Error writing to catalog, key id [%v] cache version [%v]",
wi.key.Identifier, wi.key.CacheVersion)
}

logger.Debugf(ctx, "Successfully wrote to catalog. Key [%v]", wi.key)
return workqueue.WorkStatusSucceeded, nil
}

Expand Down

0 comments on commit 2749f8f

Please sign in to comment.