Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

feat; Catalog Information including Caching published to FlyteAdmin #167

Merged
merged 12 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ require (
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.3.0 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.17.34
github.com/lyft/flyteplugins v0.3.35
github.com/lyft/flyteidl v0.17.38
github.com/lyft/flyteplugins v0.3.38
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,23 @@ github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/
github.com/lyft/flyteidl v0.17.32/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.34 h1:8ERT/8vY40dOPPJrdD8ossBb30WkvzUx/IAFMR/7+9U=
github.com/lyft/flyteidl v0.17.34/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.35-0.20200707003420-954dedb491fa/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce h1:0yFcmwunllOOdjW8d7+BA6fwQzNYbzrefbbh3dfTHcg=
github.com/lyft/flyteidl v0.17.37-0.20200721225522-b4751137e5ce/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.38 h1:fAbIzyRvBvMMe5wC7qEjD2ehPlPhQCFu5G4eskPezcg=
github.com/lyft/flyteidl v0.17.38/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.3.34/go.mod h1:HHO6KC/2z77n9o9KM697YvSP85IWDe6jl6tAIrMLqWU=
github.com/lyft/flyteplugins v0.3.35 h1:9s2BrJ82RoTJa1Cy02vqQy+ajxS+d4MQAkuUFoaiCuQ=
github.com/lyft/flyteplugins v0.3.35/go.mod h1:Dk9rnPCbgR7rC9dNM49260TQ51TvRsljDBJ6uBjZ9ys=
github.com/lyft/flyteplugins v0.3.38-0.20200724213302-8d841a7bfa4b h1:Pr2E7TQjFeZHQyE5WhEioCH5NLg80H290pNg+1lLmq4=
github.com/lyft/flyteplugins v0.3.38-0.20200724213302-8d841a7bfa4b/go.mod h1:RPAS1gST3UAp+X5i2sMavkQBAMSXBoGbh2THiJoJCQc=
github.com/lyft/flyteplugins v0.3.38-0.20200724215013-9d75d41ef924 h1:wVBgHarQ6aUIyPLyuQWN+kRIz7SW07SXCCV06fbIJb8=
github.com/lyft/flyteplugins v0.3.38-0.20200724215013-9d75d41ef924/go.mod h1:RPAS1gST3UAp+X5i2sMavkQBAMSXBoGbh2THiJoJCQc=
github.com/lyft/flyteplugins v0.3.38 h1:3yDKZ9raVV6oHBY+enRhw0CbdL1gD3KEbqEaDUlFbkU=
github.com/lyft/flyteplugins v0.3.38/go.mod h1:Vb5ZJ9uq1u3tQbAGGHOFsrj08pvS+ukpMtLAl99HAEI=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.3/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.7/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.9 h1:NaKp9xkeWWwhVvqTOcR/FqlASy1N2gu/kN7PVe4S7YI=
github.com/lyft/flytestdlib v0.3.9/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -33,7 +34,7 @@ const dynamicNodeID = "dynamic-node"
type TaskNodeHandler interface {
handler.Node
ValidateOutputAndCacheAdd(ctx context.Context, nodeID v1alpha1.NodeID, i io.InputReader, r io.OutputReader, outputCommitter io.OutputWriter,
tr pluginCore.TaskReader, m catalog.Metadata) (*io.ExecutionError, error)
tr pluginCore.TaskReader, m catalog.Metadata) (catalog.Status, *io.ExecutionError, error)
}

type metrics struct {
Expand Down Expand Up @@ -115,7 +116,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n
outputPaths := ioutils.NewRemoteFileOutputPaths(ctx, nCtx.DataStore(), nCtx.NodeStatus().GetOutputDir(), nil)
execID := task.GetTaskExecutionIdentifier(nCtx)
outputReader := ioutils.NewRemoteFileOutputReader(ctx, nCtx.DataStore(), outputPaths, nCtx.MaxDatasetSizeBytes())
ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
status, ee, err := d.TaskNodeHandler.ValidateOutputAndCacheAdd(ctx, nCtx.NodeID(), nCtx.InputReader(), outputReader, nil, nCtx.TaskReader(), catalog.Metadata{
TaskExecutionIdentifier: execID,
})

Expand All @@ -130,6 +131,7 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n

return trns.WithInfo(handler.PhaseInfoFailureErr(ee.ExecutionError, trns.Info().GetInfo())), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: ee.ExecutionError.String()}, nil
}
trns.WithInfo(trns.Info().WithInfo(&handler.ExecutionInfo{TaskNodeInfo: &handler.TaskNodeInfo{TaskNodeMetadata: &event.TaskNodeMetadata{CacheStatus: status.GetCacheStatus(), CatalogKey: status.GetMetadata()}}}))
}

return trns, newState, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

Expand Down Expand Up @@ -542,9 +543,9 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) {
mockLPLauncher := &lpMocks.Reader{}
h := &mocks.TaskNodeHandler{}
if tt.args.validErr != nil {
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.args.validErr, nil)
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), tt.args.validErr, nil)
} else {
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
h.OnValidateOutputAndCacheAddMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, &core.CatalogMetadata{ArtifactTag: &core.CatalogArtifactTag{Name: "name", ArtifactId: "id"}}), nil, nil)
}
n := &executorMocks.Node{}
if tt.args.isErr {
Expand Down
31 changes: 19 additions & 12 deletions pkg/controller/nodes/dynamic/mocks/task_node_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 13 additions & 2 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/event"
"github.com/lyft/flytestdlib/storage"
)

Expand Down Expand Up @@ -41,9 +42,9 @@ type WorkflowNodeInfo struct {
type BranchNodeInfo struct {
}

// Carries any information that should be sent as part of NodeEvents
type TaskNodeInfo struct {
CacheHit bool
// TaskPhase etc
TaskNodeMetadata *event.TaskNodeMetadata
}

type OutputInfo struct {
Expand Down Expand Up @@ -86,6 +87,16 @@ func (p PhaseInfo) GetReason() string {
return p.reason
}

func (p PhaseInfo) WithInfo(i *ExecutionInfo) PhaseInfo {
return PhaseInfo{
p: p.p,
occurredAt: p.occurredAt,
err: p.err,
info: i,
reason: p.reason,
}
}

var PhaseInfoUndefined = PhaseInfo{p: EPhaseUndefined}

func phaseInfo(p EPhase, err *core.ExecutionError, info *ExecutionInfo, reason string) PhaseInfo {
Expand Down
59 changes: 26 additions & 33 deletions pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
)

const (
taskVersionKey = "task-version"
wfExecNameKey = "execution-name"
)

var (
_ catalog.Client = &CatalogClient{}
)
Expand Down Expand Up @@ -95,43 +90,50 @@ func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, da
// - Verify there is a Dataset created for the Task
// - Lookup the Artifact that is tagged with the hash of the input values
// - The artifactData contains the literal values that serve as the task outputs
func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (io.OutputReader, error) {
func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error) {
dataset, err := m.GetDataset(ctx, key)
if err != nil {
logger.Debugf(ctx, "DataCatalog failed to get dataset for ID %s, err: %+v", key.Identifier.String(), err)
return nil, errors.Wrapf(err, "DataCatalog failed to get dataset for ID %s", key.Identifier.String())
return catalog.Entry{}, errors.Wrapf(err, "DataCatalog failed to get dataset for ID %s", key.Identifier.String())
}

inputs := &core.LiteralMap{}
if key.TypedInterface.Inputs != nil {
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to read inputs when trying to query catalog")
return catalog.Entry{}, errors.Wrap(err, "failed to read inputs when trying to query catalog")
}
inputs = retInputs
}

tag, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to generate tag for inputs %+v, err: %+v", inputs, err)
return nil, err
return catalog.Entry{}, err
}

artifact, err := m.GetArtifactByTag(ctx, tag, dataset)
if err != nil {
logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err)
return nil, err
return catalog.Entry{}, err
}
logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag)

var relevantTag *datacatalog.Tag
if len(artifact.GetTags()) > 0 {
// TODO should we look through all the tags to find the relevant one?
relevantTag = artifact.GetTags()[0]
}
md := EventCatalogMetadata(dataset.GetId(), relevantTag, GetSourceFromMetadata(dataset.GetMetadata(), artifact.GetMetadata(), key.Identifier))

outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err)
return nil, err
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err
}

logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag)
return ioutils.NewInMemoryOutputReader(outputs, nil), nil
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil
}

func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error) {
Expand Down Expand Up @@ -195,21 +197,12 @@ func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatal
// - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task
// - Create an Artifact with the execution data that belongs to the dataset
// - Tag the Artifact with a hash generated by the input values
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) error {

// Try creating the dataset in case it doesn't exist
md := &datacatalog.Metadata{
KeyMap: map[string]string{
taskVersionKey: key.Identifier.Version,
},
}
if metadata.WorkflowExecutionIdentifier != nil {
md.KeyMap[wfExecNameKey] = metadata.WorkflowExecutionIdentifier.Name
}
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error) {

datasetID, err := m.CreateDataset(ctx, key, md)
// Populate Metadata for later recovery
datasetID, err := m.CreateDataset(ctx, key, GetDatasetMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return err
return catalog.Status{}, err
}

inputs := &core.LiteralMap{}
Expand All @@ -218,7 +211,7 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
retInputs, err := key.InputReader.Get(ctx)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to read inputs err: %s", err)
return err
return catalog.Status{}, err
}
logger.Debugf(ctx, "DataCatalog read inputs")
inputs = retInputs
Expand All @@ -228,27 +221,27 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
retOutputs, retErr, err := reader.Read(ctx)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to read outputs err: %s", err)
return err
return catalog.Status{}, err
}
if retErr != nil {
logger.Errorf(ctx, "DataCatalog failed to read outputs, err :%s", retErr.Message)
return errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message)
return catalog.Status{}, errors.Errorf("Failed to read outputs. EC: %s, Msg: %s", retErr.Code, retErr.Message)
}
logger.Debugf(ctx, "DataCatalog read outputs")
outputs = retOutputs
}

// Create the artifact for the execution that belongs in the task
cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, md)
cachedArtifact, err := m.CreateArtifact(ctx, datasetID, outputs, GetArtifactMetadataForSource(metadata.TaskExecutionIdentifier))
if err != nil {
return errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String())
return catalog.Status{}, errors.Wrapf(err, "failed to create dataset for ID %s", key.Identifier.String())
}

// Tag the artifact since it is the cached artifact
tagName, err := GenerateArtifactTagName(ctx, inputs)
if err != nil {
logger.Errorf(ctx, "Failed to generate tag for artifact %+v, err: %+v", cachedArtifact.Id, err)
return err
return catalog.Status{}, err
}
logger.Infof(ctx, "Cached exec tag: %v, task: %v", tagName, key.Identifier)

Expand All @@ -264,11 +257,11 @@ func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.Outp
logger.Warnf(ctx, "Tag %v already exists for Artifact %v (idempotent)", tagName, cachedArtifact.Id)
} else {
logger.Errorf(ctx, "Failed to add tag %+v for artifact %+v, err: %+v", tagName, cachedArtifact.Id, err)
return err
return catalog.Status{}, err
}
}

return nil
return catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, EventCatalogMetadata(datasetID, tag, nil)), nil
}

// Create a new Datacatalog client for task execution caching
Expand Down
Loading