From 55b0708ac3523b855eb521e618ca9a46dfce7164 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 27 Mar 2023 12:41:20 -0700 Subject: [PATCH 01/15] temp code, switching to something else Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 86 +++++++++++++++++++++++++++++++++++++++ dataproxy/service_test.go | 15 +++++++ go.mod | 1 + 3 files changed, 102 insertions(+) diff --git a/dataproxy/service.go b/dataproxy/service.go index 948c6a25c..65c4c6f21 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,8 +5,12 @@ import ( "encoding/base32" "encoding/base64" "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "net/url" "reflect" + "regexp" + "strconv" + "strings" "time" "github.com/flyteorg/flyteadmin/pkg/errors" @@ -231,6 +235,88 @@ func createStorageLocation(ctx context.Context, store *storage.DataStore, return storagePath, nil } +func (s Service) validateResolveArtifactRequest(req *service.ResolveArtifactRequest) error { + if req.GetFlyteUrl() == "" { + return fmt.Errorf("source is required. Provided empty string") + } + if !strings.HasPrefix(req.GetFlyteUrl(), "flyte://") { + return fmt.Errorf("request does not start with the correct prefix") + } + + return nil +} + +type IOType int + +const ( + INPUT = iota + OUTPUT + DECK +) + +func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, int, IOType, error) { + // flyteUrl is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] + // where i stands for inputs.pb o for outputs.pb and d for the flyte deck + re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([0-9]+)/[iod]") + if err != nil { + return core.NodeExecutionIdentifier{}, 0, err + } + re.MatchString(flyteUrl) + matches := re.FindStringSubmatch(flyteUrl) + if len(matches) != 7 { + return core.NodeExecutionIdentifier{}, 0, 0, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) + } + proj := matches[1] + domain := matches[2] + executionId := matches[3] + nodeId := matches[4] + attempt, err := strconv.Atoi(matches[5]) + if err != nil { + return core.NodeExecutionIdentifier{}, 0, 0, fmt.Errorf("failed to parse attempt, %s", err) + } + var ioType IOType + switch matches[6] { + case "i": + ioType = INPUT + case "o": + ioType = OUTPUT + case "d": + ioType = DECK + } + + return core.NodeExecutionIdentifier{ + NodeId: nodeId, + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: proj, + Domain: domain, + Name: executionId, + }, + }, attempt, ioType, nil +} + +func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifactRequest) ( + *service.ResolveArtifactResponse, error) { + + fmt.Printf("+++++++++++++++++++++++++++++++ request\n%v\n", req) + fmt.Printf("extracted url query: %s\n", req.GetFlyteUrl()) + err := s.validateResolveArtifactRequest(req) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) + } + + // Get the node execution id and other information + nodeExecId, attempt, ioType, err := ParseFlyteUrl(req.GetFlyteUrl()) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) + } + + // Get the task executions for the node execution + + return &service.ResolveArtifactResponse{ + NativeUrl: "somes3link", + }, nil +} + func NewService(cfg config.DataProxyConfig, nodeExec interfaces.NodeExecutionInterface, dataStore *storage.DataStore) (Service, error) { diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index 261b0f086..0aa176ffe 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -2,6 +2,8 @@ package dataproxy import ( "context" + "fmt" + "regexp" "testing" "time" @@ -161,3 +163,16 @@ func TestCreateDownloadLocation(t *testing.T) { assert.NoError(t, err) }) } + +func TestParseFlyteUrl(t *testing.T) { + t.Run("valid", func(t *testing.T) { + re, _ := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([0-9]+)") + xx := re.FindStringSubmatch("flyte://v1/fs/dev/abc/n0/0") + fmt.Println(xx) + }) + + //t.Run("invalid", func(t *testing.T) { + // _, err := parseFlyteUrl("bucket/key") + // assert.Error(t, err) + //}) +} diff --git a/go.mod b/go.mod index 4efdaed1e..93b2855f6 100644 --- a/go.mod +++ b/go.mod @@ -209,3 +209,4 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 +replace github.com/flyteorg/flyteidl => ../flyteidl From 45279099f335ac36df89bb858ee68d8b29219498 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 27 Mar 2023 12:42:00 -0700 Subject: [PATCH 02/15] comment Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dataproxy/service.go b/dataproxy/service.go index 65c4c6f21..0478d3e5d 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -257,6 +257,7 @@ const ( func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, int, IOType, error) { // flyteUrl is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck + // todo: should we move iod to the front? re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([0-9]+)/[iod]") if err != nil { return core.NodeExecutionIdentifier{}, 0, err From 6694aac0ce39ec71873e7f1d9d6eaf22e6518d0b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Tue, 28 Mar 2023 15:21:29 -0700 Subject: [PATCH 03/15] add basic lookup Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 88 ++++++++++++++++++++++++++++++--------- dataproxy/service_test.go | 18 ++++---- pkg/server/service.go | 2 +- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/dataproxy/service.go b/dataproxy/service.go index 0478d3e5d..636fec7f6 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "fmt" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/logger" "net/url" "reflect" "regexp" @@ -41,6 +42,7 @@ type Service struct { dataStore *storage.DataStore shardSelector ioutils.ShardSelector nodeExecutionManager interfaces.NodeExecutionInterface + taskExecutionManager interfaces.TaskExecutionInterface } // CreateUploadLocation creates a temporary signed url to allow callers to upload content. @@ -254,29 +256,34 @@ const ( DECK ) -func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, int, IOType, error) { +func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, *int, IOType, error) { // flyteUrl is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck - // todo: should we move iod to the front? - re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([0-9]+)/[iod]") + // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution + zero := 0 + re, err := regexp.Compile("flyte://v1/([iod])/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?") if err != nil { - return core.NodeExecutionIdentifier{}, 0, err + return core.NodeExecutionIdentifier{}, &zero, 0, err } re.MatchString(flyteUrl) matches := re.FindStringSubmatch(flyteUrl) - if len(matches) != 7 { - return core.NodeExecutionIdentifier{}, 0, 0, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) - } - proj := matches[1] - domain := matches[2] - executionId := matches[3] - nodeId := matches[4] - attempt, err := strconv.Atoi(matches[5]) - if err != nil { - return core.NodeExecutionIdentifier{}, 0, 0, fmt.Errorf("failed to parse attempt, %s", err) + if len(matches) != 7 && len(matches) != 6 { + return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) + } + proj := matches[2] + domain := matches[3] + executionId := matches[4] + nodeId := matches[5] + var attempt *int // nil means node execution, not a task execution + if len(matches) == 7 && matches[6] != "" { + a, err := strconv.Atoi(matches[6]) + if err != nil { + return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse attempt, %s", err) + } + attempt = &a } var ioType IOType - switch matches[6] { + switch matches[1] { case "i": ioType = INPUT case "o": @@ -295,11 +302,13 @@ func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, int, IOType, }, attempt, ioType, nil } +// ResolveArtifact tries to return the raw remote URL. In cases where only the raw data is available, we will return +// an error code that the frontend (flytekit) should know how to handle. func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifactRequest) ( *service.ResolveArtifactResponse, error) { - fmt.Printf("+++++++++++++++++++++++++++++++ request\n%v\n", req) - fmt.Printf("extracted url query: %s\n", req.GetFlyteUrl()) + logger.Debugf(ctx, "resolving flyte url query: %s", req.GetFlyteUrl()) + var resolvedURL string err := s.validateResolveArtifactRequest(req) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) @@ -311,16 +320,54 @@ func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifa return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) } - // Get the task executions for the node execution + logger.Debugf(ctx, "resolved to node exec id %s, attempt %v, type %d", nodeExecId, attempt, ioType) + // always get the node execution + node, err := s.nodeExecutionManager.GetNodeExecution(ctx, admin.NodeExecutionGetRequest{ + Id: &nodeExecId, + }) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to find node execution [%v]. Error: %v", nodeExecId, err) + } + if attempt == nil || ioType == DECK { + // get the node execution io link, if available. + if ioType == INPUT { + resolvedURL = node.InputUri + } else if ioType == OUTPUT { + // todo: why is this deprecated, this is what the get data endpoint uses. + resolvedURL = node.Closure.GetOutputUri() + } else if ioType == DECK { + // todo: why is there no deck uri for task closure? + resolvedURL = node.Closure.DeckUri + } + } else { + taskExecs, err := s.taskExecutionManager.ListTaskExecutions(ctx, admin.TaskExecutionListRequest{ + NodeExecutionId: &nodeExecId, + Limit: 1, + Filters: fmt.Sprintf("eq(retry_attempt,%s)", strconv.Itoa(*attempt)), + }) + if err != nil || len(taskExecs.TaskExecutions) == 0 { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecId, err) + } + taskExec := taskExecs.TaskExecutions[0] + if ioType == INPUT { + resolvedURL = taskExec.InputUri + } else if ioType == OUTPUT { + resolvedURL = taskExec.Closure.GetOutputUri() + } + } + if resolvedURL == "" { + return nil, errors.NewFlyteAdminErrorf(codes.NotFound, "failed to resolve [%s]. Error: %v", req.GetFlyteUrl(), err) + } return &service.ResolveArtifactResponse{ - NativeUrl: "somes3link", + NativeUrl: resolvedURL, }, nil } func NewService(cfg config.DataProxyConfig, nodeExec interfaces.NodeExecutionInterface, - dataStore *storage.DataStore) (Service, error) { + dataStore *storage.DataStore, + taskExec interfaces.TaskExecutionInterface) (Service, error) { // Context is not used in the constructor. Should ideally be removed. selector, err := ioutils.NewBase36PrefixShardSelector(context.TODO()) @@ -333,5 +380,6 @@ func NewService(cfg config.DataProxyConfig, dataStore: dataStore, shardSelector: selector, nodeExecutionManager: nodeExec, + taskExecutionManager: taskExec, }, nil } diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index 0aa176ffe..ffd674bbf 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -3,7 +3,6 @@ package dataproxy import ( "context" "fmt" - "regexp" "testing" "time" @@ -166,13 +165,14 @@ func TestCreateDownloadLocation(t *testing.T) { func TestParseFlyteUrl(t *testing.T) { t.Run("valid", func(t *testing.T) { - re, _ := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([0-9]+)") - xx := re.FindStringSubmatch("flyte://v1/fs/dev/abc/n0/0") - fmt.Println(xx) + ne, attempt, kind, err := ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0/0") + assert.NoError(t, err) + fmt.Println(ne, attempt, kind, err) + ne, attempt, kind, err = ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0") + assert.NoError(t, err) + fmt.Println(ne, attempt, kind, err) + ne, attempt, kind, err = ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0/") + assert.NoError(t, err) + fmt.Println(ne, attempt, kind, err) }) - - //t.Run("invalid", func(t *testing.T) { - // _, err := parseFlyteUrl("bucket/key") - // assert.Error(t, err) - //}) } diff --git a/pkg/server/service.go b/pkg/server/service.go index 4f1f58ffb..1fe2f57c1 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -119,7 +119,7 @@ func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *c service.RegisterIdentityServiceServer(grpcServer, authCtx.IdentityService()) } - dataProxySvc, err := dataproxy.NewService(cfg.DataProxy, adminServer.NodeExecutionManager, dataStorageClient) + dataProxySvc, err := dataproxy.NewService(cfg.DataProxy, adminServer.NodeExecutionManager, dataStorageClient, adminServer.TaskExecutionManager) if err != nil { return nil, fmt.Errorf("failed to initialize dataProxy service. Error: %w", err) } From 1e86fe11821a2076d3591b828d10e355c270c304 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 29 Mar 2023 15:52:03 -0700 Subject: [PATCH 04/15] add lookup to download link service as well Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 48 ++++++++++++++++++++++++++++-------------- flyteadmin_config.yaml | 2 +- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/dataproxy/service.go b/dataproxy/service.go index 636fec7f6..00f530b2b 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -121,6 +121,15 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown switch req.GetArtifactType() { case service.ArtifactType_ARTIFACT_TYPE_DECK: nativeURL = node.Closure.DeckUri + case service.ArtifactType_ARTIFACT_TYPE_INPUT: + nativeURL = node.InputUri + case service.ArtifactType_ARTIFACT_TYPE_OUTPUT: + nativeURL = node.Closure.GetOutputUri() + } + } else if flyteURLEnvelope, casted := req.GetSource().(*service.CreateDownloadLinkRequest_FlyteUrl); casted { + nativeURL, err = s.ResolveFlyteURL(ctx, flyteURLEnvelope.FlyteUrl) + if err != nil { + return nil, err } } else { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "unsupported source [%v]", reflect.TypeOf(req.GetSource())) @@ -302,22 +311,12 @@ func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, *int, IOType, }, attempt, ioType, nil } -// ResolveArtifact tries to return the raw remote URL. In cases where only the raw data is available, we will return -// an error code that the frontend (flytekit) should know how to handle. -func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifactRequest) ( - *service.ResolveArtifactResponse, error) { - - logger.Debugf(ctx, "resolving flyte url query: %s", req.GetFlyteUrl()) +func (s Service) ResolveFlyteURL(ctx context.Context, flyteURL string) (string, error) { var resolvedURL string - err := s.validateResolveArtifactRequest(req) - if err != nil { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) - } - // Get the node execution id and other information - nodeExecId, attempt, ioType, err := ParseFlyteUrl(req.GetFlyteUrl()) + nodeExecId, attempt, ioType, err := ParseFlyteUrl(flyteURL) if err != nil { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) + return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) } logger.Debugf(ctx, "resolved to node exec id %s, attempt %v, type %d", nodeExecId, attempt, ioType) @@ -326,7 +325,7 @@ func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifa Id: &nodeExecId, }) if err != nil { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to find node execution [%v]. Error: %v", nodeExecId, err) + return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to find node execution [%v]. Error: %v", nodeExecId, err) } if attempt == nil || ioType == DECK { // get the node execution io link, if available. @@ -346,7 +345,7 @@ func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifa Filters: fmt.Sprintf("eq(retry_attempt,%s)", strconv.Itoa(*attempt)), }) if err != nil || len(taskExecs.TaskExecutions) == 0 { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecId, err) + return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecId, err) } taskExec := taskExecs.TaskExecutions[0] if ioType == INPUT { @@ -356,7 +355,24 @@ func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifa } } if resolvedURL == "" { - return nil, errors.NewFlyteAdminErrorf(codes.NotFound, "failed to resolve [%s]. Error: %v", req.GetFlyteUrl(), err) + return "", errors.NewFlyteAdminErrorf(codes.NotFound, "failed to resolve [%s]. Error: %v", flyteURL, err) + } + return resolvedURL, nil +} + +// ResolveArtifact tries to return the raw remote URL. In cases where only the raw data is available, we will return +// an error code that the frontend (flytekit) should know how to handle. +func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifactRequest) ( + *service.ResolveArtifactResponse, error) { + + logger.Debugf(ctx, "resolving flyte url query: %s", req.GetFlyteUrl()) + err := s.validateResolveArtifactRequest(req) + if err != nil { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) + } + resolvedURL, err := s.ResolveFlyteURL(ctx, req.GetFlyteUrl()) + if err != nil { + return nil, err } return &service.ResolveArtifactResponse{ diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index 964f83a81..7b48ab5ef 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -129,7 +129,7 @@ storage: secret_key: miniostorage signedUrl: stowConfigOverride: - endpoint: http://localhost:30084 + endpoint: http://localhost:30002 cache: max_size_mbs: 10 target_gc_percent: 100 From 6b7aa38ff7521de2939ef9f32afaa70db2b6208a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 19 Apr 2023 16:29:13 -0700 Subject: [PATCH 05/15] use new idl Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 164 +++++++++++++++++++++----------------- dataproxy/service_test.go | 18 +++-- flyteadmin_config.yaml | 2 +- 3 files changed, 102 insertions(+), 82 deletions(-) diff --git a/dataproxy/service.go b/dataproxy/service.go index 00f530b2b..a01ddda49 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,8 +5,6 @@ import ( "encoding/base32" "encoding/base64" "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flytestdlib/logger" "net/url" "reflect" "regexp" @@ -14,6 +12,9 @@ import ( "strings" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flyteadmin/pkg/errors" "google.golang.org/grpc/codes" @@ -121,15 +122,6 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown switch req.GetArtifactType() { case service.ArtifactType_ARTIFACT_TYPE_DECK: nativeURL = node.Closure.DeckUri - case service.ArtifactType_ARTIFACT_TYPE_INPUT: - nativeURL = node.InputUri - case service.ArtifactType_ARTIFACT_TYPE_OUTPUT: - nativeURL = node.Closure.GetOutputUri() - } - } else if flyteURLEnvelope, casted := req.GetSource().(*service.CreateDownloadLinkRequest_FlyteUrl); casted { - nativeURL, err = s.ResolveFlyteURL(ctx, flyteURLEnvelope.FlyteUrl) - if err != nil { - return nil, err } } else { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "unsupported source [%v]", reflect.TypeOf(req.GetSource())) @@ -246,7 +238,7 @@ func createStorageLocation(ctx context.Context, store *storage.DataStore, return storagePath, nil } -func (s Service) validateResolveArtifactRequest(req *service.ResolveArtifactRequest) error { +func (s Service) validateResolveArtifactRequest(req *service.GetDataRequest) error { if req.GetFlyteUrl() == "" { return fmt.Errorf("source is required. Provided empty string") } @@ -265,34 +257,34 @@ const ( DECK ) -func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, *int, IOType, error) { - // flyteUrl is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] +func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, error) { + // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution zero := 0 - re, err := regexp.Compile("flyte://v1/([iod])/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?") + re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") if err != nil { return core.NodeExecutionIdentifier{}, &zero, 0, err } - re.MatchString(flyteUrl) - matches := re.FindStringSubmatch(flyteUrl) + re.MatchString(flyteURL) + matches := re.FindStringSubmatch(flyteURL) if len(matches) != 7 && len(matches) != 6 { return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) } - proj := matches[2] - domain := matches[3] - executionId := matches[4] - nodeId := matches[5] + proj := matches[1] + domain := matches[2] + executionID := matches[3] + nodeID := matches[4] var attempt *int // nil means node execution, not a task execution - if len(matches) == 7 && matches[6] != "" { - a, err := strconv.Atoi(matches[6]) + if len(matches) == 7 && matches[5] != "" { + a, err := strconv.Atoi(matches[5]) if err != nil { return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse attempt, %s", err) } attempt = &a } var ioType IOType - switch matches[1] { + switch matches[len(matches)-1] { case "i": ioType = INPUT case "o": @@ -302,81 +294,105 @@ func ParseFlyteUrl(flyteUrl string) (core.NodeExecutionIdentifier, *int, IOType, } return core.NodeExecutionIdentifier{ - NodeId: nodeId, + NodeId: nodeID, ExecutionId: &core.WorkflowExecutionIdentifier{ Project: proj, Domain: domain, - Name: executionId, + Name: executionID, }, }, attempt, ioType, nil } -func (s Service) ResolveFlyteURL(ctx context.Context, flyteURL string) (string, error) { - var resolvedURL string - // Get the node execution id and other information - nodeExecId, attempt, ioType, err := ParseFlyteUrl(flyteURL) +func (s Service) GetTaskExecutionID(ctx context.Context, attempt int, nodeExecID core.NodeExecutionIdentifier) (*core.TaskExecutionIdentifier, error) { + taskExecs, err := s.taskExecutionManager.ListTaskExecutions(ctx, admin.TaskExecutionListRequest{ + NodeExecutionId: &nodeExecID, + Limit: 1, + Filters: fmt.Sprintf("eq(retry_attempt,%s)", strconv.Itoa(attempt)), + }) + if err != nil || len(taskExecs.TaskExecutions) == 0 { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecID, err) + } + taskExec := taskExecs.TaskExecutions[0] + return taskExec.Id, nil +} + +func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( + *service.GetDataResponse, error) { + + logger.Debugf(ctx, "resolving flyte url query: %s", req.GetFlyteUrl()) + err := s.validateResolveArtifactRequest(req) if err != nil { - return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) } - logger.Debugf(ctx, "resolved to node exec id %s, attempt %v, type %d", nodeExecId, attempt, ioType) - // always get the node execution - node, err := s.nodeExecutionManager.GetNodeExecution(ctx, admin.NodeExecutionGetRequest{ - Id: &nodeExecId, - }) + nodeExecID, attempt, ioType, err := ParseFlyteURL(req.GetFlyteUrl()) if err != nil { - return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to find node execution [%v]. Error: %v", nodeExecId, err) + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) } - if attempt == nil || ioType == DECK { - // get the node execution io link, if available. - if ioType == INPUT { - resolvedURL = node.InputUri - } else if ioType == OUTPUT { - // todo: why is this deprecated, this is what the get data endpoint uses. - resolvedURL = node.Closure.GetOutputUri() - } else if ioType == DECK { - // todo: why is there no deck uri for task closure? - resolvedURL = node.Closure.DeckUri - } - } else { - taskExecs, err := s.taskExecutionManager.ListTaskExecutions(ctx, admin.TaskExecutionListRequest{ - NodeExecutionId: &nodeExecId, - Limit: 1, - Filters: fmt.Sprintf("eq(retry_attempt,%s)", strconv.Itoa(*attempt)), + + // Get the data location, then decide how/where to fetch it from + if attempt == nil { + resp, err := s.nodeExecutionManager.GetNodeExecutionData(ctx, admin.NodeExecutionGetDataRequest{ + Id: &nodeExecID, }) - if err != nil || len(taskExecs.TaskExecutions) == 0 { - return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecId, err) + if err != nil { + return nil, err } - taskExec := taskExecs.TaskExecutions[0] + + var lm *core.LiteralMap if ioType == INPUT { - resolvedURL = taskExec.InputUri + lm = resp.FullInputs } else if ioType == OUTPUT { - resolvedURL = taskExec.Closure.GetOutputUri() + lm = resp.FullOutputs + } else { + // Assume deck, and create a download link request + dlRequest := service.CreateDownloadLinkRequest{ + ArtifactType: service.ArtifactType_ARTIFACT_TYPE_DECK, + Source: &service.CreateDownloadLinkRequest_NodeExecutionId{NodeExecutionId: &nodeExecID}, + } + resp, err := s.CreateDownloadLink(ctx, &dlRequest) + if err != nil { + return nil, err + } + return &service.GetDataResponse{ + Data: &service.GetDataResponse_FlyteDeckDownloadLink{ + FlyteDeckDownloadLink: resp, + }, + }, nil } + + return &service.GetDataResponse{ + Data: &service.GetDataResponse_LiteralMap{ + LiteralMap: lm, + }, + }, nil } - if resolvedURL == "" { - return "", errors.NewFlyteAdminErrorf(codes.NotFound, "failed to resolve [%s]. Error: %v", flyteURL, err) + // Rest of the logic handles task attempt lookups + var lm *core.LiteralMap + taskExecID, err := s.GetTaskExecutionID(ctx, *attempt, nodeExecID) + if err != nil { + return nil, err } - return resolvedURL, nil -} - -// ResolveArtifact tries to return the raw remote URL. In cases where only the raw data is available, we will return -// an error code that the frontend (flytekit) should know how to handle. -func (s Service) ResolveArtifact(ctx context.Context, req *service.ResolveArtifactRequest) ( - *service.ResolveArtifactResponse, error) { - logger.Debugf(ctx, "resolving flyte url query: %s", req.GetFlyteUrl()) - err := s.validateResolveArtifactRequest(req) - if err != nil { - return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) + reqT := admin.TaskExecutionGetDataRequest{ + Id: taskExecID, } - resolvedURL, err := s.ResolveFlyteURL(ctx, req.GetFlyteUrl()) + resp, err := s.taskExecutionManager.GetTaskExecutionData(ctx, reqT) if err != nil { return nil, err } - return &service.ResolveArtifactResponse{ - NativeUrl: resolvedURL, + if ioType == INPUT { + lm = resp.FullInputs + } else if ioType == OUTPUT { + lm = resp.FullOutputs + } else { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "deck type cannot be specified with a retry attempt, just use the node instead") + } + return &service.GetDataResponse{ + Data: &service.GetDataResponse_LiteralMap{ + LiteralMap: lm, + }, }, nil } diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index ffd674bbf..14cefad8f 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -33,9 +33,10 @@ func TestNewService(t *testing.T) { assert.NoError(t, err) nodeExecutionManager := &mocks.MockNodeExecutionManager{} + taskExecutionManager := &mocks.MockTaskExecutionManager{} s, err := NewService(config.DataProxyConfig{ Upload: config.DataProxyUploadConfig{}, - }, nodeExecutionManager, dataStore) + }, nodeExecutionManager, dataStore, taskExecutionManager) assert.NoError(t, err) assert.NotNil(t, s) } @@ -58,7 +59,8 @@ func TestCreateUploadLocation(t *testing.T) { dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) nodeExecutionManager := &mocks.MockNodeExecutionManager{} - s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore) + taskExecutionManager := &mocks.MockTaskExecutionManager{} + s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager) assert.NoError(t, err) t.Run("No project/domain", func(t *testing.T) { _, err = s.CreateUploadLocation(context.Background(), &service.CreateUploadLocationRequest{}) @@ -93,8 +95,9 @@ func TestCreateDownloadLink(t *testing.T) { }, }, nil }) + taskExecutionManager := &mocks.MockTaskExecutionManager{} - s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore) + s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager) assert.NoError(t, err) t.Run("Invalid expiry", func(t *testing.T) { @@ -129,7 +132,8 @@ func TestCreateDownloadLink(t *testing.T) { func TestCreateDownloadLocation(t *testing.T) { dataStore := commonMocks.GetMockStorageClient() nodeExecutionManager := &mocks.MockNodeExecutionManager{} - s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore) + taskExecutionManager := &mocks.MockTaskExecutionManager{} + s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager) assert.NoError(t, err) t.Run("Invalid expiry", func(t *testing.T) { @@ -165,13 +169,13 @@ func TestCreateDownloadLocation(t *testing.T) { func TestParseFlyteUrl(t *testing.T) { t.Run("valid", func(t *testing.T) { - ne, attempt, kind, err := ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0/0") + ne, attempt, kind, err := ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/o") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0") + ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/i") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = ParseFlyteUrl("flyte://v1/i/fs/dev/abc/n0/") + ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/d") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) }) diff --git a/flyteadmin_config.yaml b/flyteadmin_config.yaml index 7b48ab5ef..e3d19f732 100644 --- a/flyteadmin_config.yaml +++ b/flyteadmin_config.yaml @@ -114,7 +114,7 @@ externalEvents: eventTypes: all Logger: show-source: true - level: 6 + level: 5 storage: type: stow stow: From 3d81bc84db229d1a6c107ecdf4d981a16c6dbf11 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 20 Apr 2023 18:43:51 -0700 Subject: [PATCH 06/15] use new endpoints Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 66 ++-------------- dataproxy/service_test.go | 7 +- pkg/common/flyte_url.go | 92 ++++++++++++++++++++++ pkg/manager/impl/node_execution_manager.go | 1 + pkg/manager/impl/task_execution_manager.go | 1 + 5 files changed, 104 insertions(+), 63 deletions(-) create mode 100644 pkg/common/flyte_url.go diff --git a/dataproxy/service.go b/dataproxy/service.go index a01ddda49..1b23ceab7 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,9 +5,9 @@ import ( "encoding/base32" "encoding/base64" "fmt" + "github.com/flyteorg/flyteadmin/pkg/common" "net/url" "reflect" - "regexp" "strconv" "strings" "time" @@ -249,60 +249,6 @@ func (s Service) validateResolveArtifactRequest(req *service.GetDataRequest) err return nil } -type IOType int - -const ( - INPUT = iota - OUTPUT - DECK -) - -func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, error) { - // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] - // where i stands for inputs.pb o for outputs.pb and d for the flyte deck - // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution - zero := 0 - re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") - if err != nil { - return core.NodeExecutionIdentifier{}, &zero, 0, err - } - re.MatchString(flyteURL) - matches := re.FindStringSubmatch(flyteURL) - if len(matches) != 7 && len(matches) != 6 { - return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) - } - proj := matches[1] - domain := matches[2] - executionID := matches[3] - nodeID := matches[4] - var attempt *int // nil means node execution, not a task execution - if len(matches) == 7 && matches[5] != "" { - a, err := strconv.Atoi(matches[5]) - if err != nil { - return core.NodeExecutionIdentifier{}, &zero, 0, fmt.Errorf("failed to parse attempt, %s", err) - } - attempt = &a - } - var ioType IOType - switch matches[len(matches)-1] { - case "i": - ioType = INPUT - case "o": - ioType = OUTPUT - case "d": - ioType = DECK - } - - return core.NodeExecutionIdentifier{ - NodeId: nodeID, - ExecutionId: &core.WorkflowExecutionIdentifier{ - Project: proj, - Domain: domain, - Name: executionID, - }, - }, attempt, ioType, nil -} - func (s Service) GetTaskExecutionID(ctx context.Context, attempt int, nodeExecID core.NodeExecutionIdentifier) (*core.TaskExecutionIdentifier, error) { taskExecs, err := s.taskExecutionManager.ListTaskExecutions(ctx, admin.TaskExecutionListRequest{ NodeExecutionId: &nodeExecID, @@ -325,7 +271,7 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to validate resolve artifact request. Error: %v", err) } - nodeExecID, attempt, ioType, err := ParseFlyteURL(req.GetFlyteUrl()) + nodeExecID, attempt, ioType, err := common.ParseFlyteURL(req.GetFlyteUrl()) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to parse artifact url Error: %v", err) } @@ -340,9 +286,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( } var lm *core.LiteralMap - if ioType == INPUT { + if ioType == common.INPUT { lm = resp.FullInputs - } else if ioType == OUTPUT { + } else if ioType == common.OUTPUT { lm = resp.FullOutputs } else { // Assume deck, and create a download link request @@ -382,9 +328,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( return nil, err } - if ioType == INPUT { + if ioType == common.INPUT { lm = resp.FullInputs - } else if ioType == OUTPUT { + } else if ioType == common.OUTPUT { lm = resp.FullOutputs } else { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "deck type cannot be specified with a retry attempt, just use the node instead") diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index 14cefad8f..c5519e2d9 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -3,6 +3,7 @@ package dataproxy import ( "context" "fmt" + "github.com/flyteorg/flyteadmin/pkg/common" "testing" "time" @@ -169,13 +170,13 @@ func TestCreateDownloadLocation(t *testing.T) { func TestParseFlyteUrl(t *testing.T) { t.Run("valid", func(t *testing.T) { - ne, attempt, kind, err := ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/o") + ne, attempt, kind, err := common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/o") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/i") + ne, attempt, kind, err = common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/i") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/d") + ne, attempt, kind, err = common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/d") assert.NoError(t, err) fmt.Println(ne, attempt, kind, err) }) diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go new file mode 100644 index 000000000..cf99c1e25 --- /dev/null +++ b/pkg/common/flyte_url.go @@ -0,0 +1,92 @@ +package common + +import ( + "fmt" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "regexp" + "strconv" +) + +type IOType string + +const ( + UndefinedIo IOType = "" + INPUT = "i" + OUTPUT = "o" + DECK = "d" +) + +func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, error) { + // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] + // where i stands for inputs.pb o for outputs.pb and d for the flyte deck + // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution + zero := 0 + re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") + if err != nil { + return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, err + } + re.MatchString(flyteURL) + matches := re.FindStringSubmatch(flyteURL) + if len(matches) != 7 && len(matches) != 6 { + return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) + } + proj := matches[1] + domain := matches[2] + executionID := matches[3] + nodeID := matches[4] + var attempt *int // nil means node execution, not a task execution + if len(matches) == 7 && matches[5] != "" { + a, err := strconv.Atoi(matches[5]) + if err != nil { + return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, fmt.Errorf("failed to parse attempt, %s", err) + } + attempt = &a + } + var ioType IOType + switch matches[len(matches)-1] { + case "i": + ioType = INPUT + case "o": + ioType = OUTPUT + case "d": + ioType = DECK + } + + return core.NodeExecutionIdentifier{ + NodeId: nodeID, + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: proj, + Domain: domain, + Name: executionID, + }, + }, attempt, ioType, nil +} + +func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, deck bool) *admin.FlyteURLs { + base := fmt.Sprintf("flyte://v1/%s/%s/%s/%s", nodeExecutionID.ExecutionId.Project, + nodeExecutionID.ExecutionId.Domain, nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId) + + res := &admin.FlyteURLs{ + Inputs: fmt.Sprintf("%s/%s", base, INPUT), + Outputs: fmt.Sprintf("%s/%s", base, OUTPUT), + } + if deck { + res.Deck = fmt.Sprintf("%s/%s", base, DECK) + } + return res +} + +func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, deck bool) *admin.FlyteURLs { + base := fmt.Sprintf("flyte://v1/%s/%s/%s/%s/%s", taskExecutionID.NodeExecutionId.ExecutionId.Project, + taskExecutionID.NodeExecutionId.ExecutionId.Domain, taskExecutionID.NodeExecutionId.ExecutionId.Name, taskExecutionID.NodeExecutionId.NodeId, strconv.Itoa(int(taskExecutionID.RetryAttempt))) + + res := &admin.FlyteURLs{ + Inputs: fmt.Sprintf("%s/%s", base, INPUT), + Outputs: fmt.Sprintf("%s/%s", base, OUTPUT), + } + if deck { + res.Deck = fmt.Sprintf("%s/%s", base, DECK) + } + return res +} diff --git a/pkg/manager/impl/node_execution_manager.go b/pkg/manager/impl/node_execution_manager.go index ae5a7bb40..bcc4362db 100644 --- a/pkg/manager/impl/node_execution_manager.go +++ b/pkg/manager/impl/node_execution_manager.go @@ -521,6 +521,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData( Outputs: outputURLBlob, FullInputs: inputs, FullOutputs: outputs, + FlyteUrls: common.FlyteURLsFromNodeExecutionID(*request.Id, nodeExecution.GetClosure() != nil && nodeExecution.GetClosure().GetDeckUri() != ""), } if len(nodeExecutionModel.DynamicWorkflowRemoteClosureReference) > 0 { diff --git a/pkg/manager/impl/task_execution_manager.go b/pkg/manager/impl/task_execution_manager.go index 60825309b..46967f264 100644 --- a/pkg/manager/impl/task_execution_manager.go +++ b/pkg/manager/impl/task_execution_manager.go @@ -331,6 +331,7 @@ func (m *TaskExecutionManager) GetTaskExecutionData( Outputs: outputURLBlob, FullInputs: inputs, FullOutputs: outputs, + FlyteUrls: common.FlyteURLsFromTaskExecutionID(*request.Id, false), } m.metrics.TaskExecutionInputBytes.Observe(float64(response.Inputs.Bytes)) From 002bf283c09c4d66e1feefd47e9a33de329ae815 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 21 Apr 2023 20:01:57 -0700 Subject: [PATCH 07/15] use enumer Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 11 ++++--- dataproxy/service_test.go | 3 +- pkg/common/flyte_url.go | 64 +++++++++++++++++++++---------------- pkg/common/iotype_enumer.go | 51 +++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 34 deletions(-) create mode 100644 pkg/common/iotype_enumer.go diff --git a/dataproxy/service.go b/dataproxy/service.go index 1b23ceab7..942f3a46c 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -5,13 +5,14 @@ import ( "encoding/base32" "encoding/base64" "fmt" - "github.com/flyteorg/flyteadmin/pkg/common" "net/url" "reflect" "strconv" "strings" "time" + "github.com/flyteorg/flyteadmin/pkg/common" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/logger" @@ -286,9 +287,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( } var lm *core.LiteralMap - if ioType == common.INPUT { + if ioType == common.IOType_i { lm = resp.FullInputs - } else if ioType == common.OUTPUT { + } else if ioType == common.IOType_o { lm = resp.FullOutputs } else { // Assume deck, and create a download link request @@ -328,9 +329,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( return nil, err } - if ioType == common.INPUT { + if ioType == common.IOType_i { lm = resp.FullInputs - } else if ioType == common.OUTPUT { + } else if ioType == common.IOType_o { lm = resp.FullOutputs } else { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "deck type cannot be specified with a retry attempt, just use the node instead") diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index c5519e2d9..99b8bd9d9 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -3,10 +3,11 @@ package dataproxy import ( "context" "fmt" - "github.com/flyteorg/flyteadmin/pkg/common" "testing" "time" + "github.com/flyteorg/flyteadmin/pkg/common" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index cf99c1e25..5c57634e9 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -2,34 +2,38 @@ package common import ( "fmt" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "regexp" "strconv" + "strings" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) -type IOType string +//go:generate enumer --type=IOType --trimprefix=IOType_ +type IOType int + +// The suffixes in these constants are used to match against the tail end of the flyte url, to keep tne flyte url simpler +// +//nolint:all const ( - UndefinedIo IOType = "" - INPUT = "i" - OUTPUT = "o" - DECK = "d" + IOType_undefined IOType = iota + IOType_i + IOType_o + IOType_d ) +var re = regexp.MustCompile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") + func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, error) { // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution - zero := 0 - re, err := regexp.Compile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") - if err != nil { - return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, err - } re.MatchString(flyteURL) matches := re.FindStringSubmatch(flyteURL) if len(matches) != 7 && len(matches) != 6 { - return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) + return core.NodeExecutionIdentifier{}, nil, IOType_undefined, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) } proj := matches[1] domain := matches[2] @@ -39,19 +43,23 @@ func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, if len(matches) == 7 && matches[5] != "" { a, err := strconv.Atoi(matches[5]) if err != nil { - return core.NodeExecutionIdentifier{}, &zero, UndefinedIo, fmt.Errorf("failed to parse attempt, %s", err) + return core.NodeExecutionIdentifier{}, nil, IOType_undefined, fmt.Errorf("failed to parse attempt, %s", err) } attempt = &a } - var ioType IOType - switch matches[len(matches)-1] { - case "i": - ioType = INPUT - case "o": - ioType = OUTPUT - case "d": - ioType = DECK + ioLower := strings.ToLower(matches[len(matches)-1]) + ioType, err := IOTypeString(ioLower) + if err != nil { + return core.NodeExecutionIdentifier{}, nil, IOType_undefined, err } + //switch matches[len(matches)-1] { + //case "i": + // ioType = I + //case "o": + // ioType = OUTPUT + //case "d": + // ioType = DECK + //} return core.NodeExecutionIdentifier{ NodeId: nodeID, @@ -68,11 +76,11 @@ func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, nodeExecutionID.ExecutionId.Domain, nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId) res := &admin.FlyteURLs{ - Inputs: fmt.Sprintf("%s/%s", base, INPUT), - Outputs: fmt.Sprintf("%s/%s", base, OUTPUT), + Inputs: fmt.Sprintf("%s/%s", base, IOType_i), + Outputs: fmt.Sprintf("%s/%s", base, IOType_o), } if deck { - res.Deck = fmt.Sprintf("%s/%s", base, DECK) + res.Deck = fmt.Sprintf("%s/%s", base, IOType_d) } return res } @@ -82,11 +90,11 @@ func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, taskExecutionID.NodeExecutionId.ExecutionId.Domain, taskExecutionID.NodeExecutionId.ExecutionId.Name, taskExecutionID.NodeExecutionId.NodeId, strconv.Itoa(int(taskExecutionID.RetryAttempt))) res := &admin.FlyteURLs{ - Inputs: fmt.Sprintf("%s/%s", base, INPUT), - Outputs: fmt.Sprintf("%s/%s", base, OUTPUT), + Inputs: fmt.Sprintf("%s/%s", base, IOType_i), + Outputs: fmt.Sprintf("%s/%s", base, IOType_o), } if deck { - res.Deck = fmt.Sprintf("%s/%s", base, DECK) + res.Deck = fmt.Sprintf("%s/%s", base, IOType_d) } return res } diff --git a/pkg/common/iotype_enumer.go b/pkg/common/iotype_enumer.go new file mode 100644 index 000000000..f68969a9d --- /dev/null +++ b/pkg/common/iotype_enumer.go @@ -0,0 +1,51 @@ +// Code generated by "enumer --type=IOType --trimprefix=IOType_"; DO NOT EDIT. + +package common + +import ( + "fmt" +) + +const _IOTypeName = "undefinediod" + +var _IOTypeIndex = [...]uint8{0, 9, 10, 11, 12} + +func (i IOType) String() string { + if i < 0 || i >= IOType(len(_IOTypeIndex)-1) { + return fmt.Sprintf("IOType(%d)", i) + } + return _IOTypeName[_IOTypeIndex[i]:_IOTypeIndex[i+1]] +} + +var _IOTypeValues = []IOType{0, 1, 2, 3} + +var _IOTypeNameToValueMap = map[string]IOType{ + _IOTypeName[0:9]: 0, + _IOTypeName[9:10]: 1, + _IOTypeName[10:11]: 2, + _IOTypeName[11:12]: 3, +} + +// IOTypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func IOTypeString(s string) (IOType, error) { + if val, ok := _IOTypeNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to IOType values", s) +} + +// IOTypeValues returns all values of the enum +func IOTypeValues() []IOType { + return _IOTypeValues +} + +// IsAIOType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i IOType) IsAIOType() bool { + for _, v := range _IOTypeValues { + if i == v { + return true + } + } + return false +} From 571d04f27b313f6b3360153216f180b106d683e3 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 21 Apr 2023 20:10:24 -0700 Subject: [PATCH 08/15] use the name artifact type instead Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 8 ++--- pkg/common/artifacttype_enumer.go | 51 +++++++++++++++++++++++++++++++ pkg/common/flyte_url.go | 45 +++++++++++---------------- pkg/common/iotype_enumer.go | 51 ------------------------------- 4 files changed, 73 insertions(+), 82 deletions(-) create mode 100644 pkg/common/artifacttype_enumer.go delete mode 100644 pkg/common/iotype_enumer.go diff --git a/dataproxy/service.go b/dataproxy/service.go index 942f3a46c..4aea7e720 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -287,9 +287,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( } var lm *core.LiteralMap - if ioType == common.IOType_i { + if ioType == common.ArtifactTypeI { lm = resp.FullInputs - } else if ioType == common.IOType_o { + } else if ioType == common.ArtifactTypeO { lm = resp.FullOutputs } else { // Assume deck, and create a download link request @@ -329,9 +329,9 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( return nil, err } - if ioType == common.IOType_i { + if ioType == common.ArtifactTypeI { lm = resp.FullInputs - } else if ioType == common.IOType_o { + } else if ioType == common.ArtifactTypeO { lm = resp.FullOutputs } else { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "deck type cannot be specified with a retry attempt, just use the node instead") diff --git a/pkg/common/artifacttype_enumer.go b/pkg/common/artifacttype_enumer.go new file mode 100644 index 000000000..6847e6b7d --- /dev/null +++ b/pkg/common/artifacttype_enumer.go @@ -0,0 +1,51 @@ +// Code generated by "enumer --type=ArtifactType --trimprefix=ArtifactType -transform=snake"; DO NOT EDIT. + +package common + +import ( + "fmt" +) + +const _ArtifactTypeName = "undefinediod" + +var _ArtifactTypeIndex = [...]uint8{0, 9, 10, 11, 12} + +func (i ArtifactType) String() string { + if i < 0 || i >= ArtifactType(len(_ArtifactTypeIndex)-1) { + return fmt.Sprintf("ArtifactType(%d)", i) + } + return _ArtifactTypeName[_ArtifactTypeIndex[i]:_ArtifactTypeIndex[i+1]] +} + +var _ArtifactTypeValues = []ArtifactType{0, 1, 2, 3} + +var _ArtifactTypeNameToValueMap = map[string]ArtifactType{ + _ArtifactTypeName[0:9]: 0, + _ArtifactTypeName[9:10]: 1, + _ArtifactTypeName[10:11]: 2, + _ArtifactTypeName[11:12]: 3, +} + +// ArtifactTypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func ArtifactTypeString(s string) (ArtifactType, error) { + if val, ok := _ArtifactTypeNameToValueMap[s]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to ArtifactType values", s) +} + +// ArtifactTypeValues returns all values of the enum +func ArtifactTypeValues() []ArtifactType { + return _ArtifactTypeValues +} + +// IsAArtifactType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i ArtifactType) IsAArtifactType() bool { + for _, v := range _ArtifactTypeValues { + if i == v { + return true + } + } + return false +} diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 5c57634e9..2d9991136 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -10,30 +10,29 @@ import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" ) -//go:generate enumer --type=IOType --trimprefix=IOType_ +// transform to snake case to make lower case +//go:generate enumer --type=ArtifactType --trimprefix=ArtifactType -transform=snake -type IOType int +type ArtifactType int // The suffixes in these constants are used to match against the tail end of the flyte url, to keep tne flyte url simpler -// -//nolint:all const ( - IOType_undefined IOType = iota - IOType_i - IOType_o - IOType_d + ArtifactTypeUndefined ArtifactType = iota + ArtifactTypeI + ArtifactTypeO + ArtifactTypeD ) var re = regexp.MustCompile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") -func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, error) { +func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, ArtifactType, error) { // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution re.MatchString(flyteURL) matches := re.FindStringSubmatch(flyteURL) if len(matches) != 7 && len(matches) != 6 { - return core.NodeExecutionIdentifier{}, nil, IOType_undefined, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) + return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) } proj := matches[1] domain := matches[2] @@ -43,23 +42,15 @@ func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, IOType, if len(matches) == 7 && matches[5] != "" { a, err := strconv.Atoi(matches[5]) if err != nil { - return core.NodeExecutionIdentifier{}, nil, IOType_undefined, fmt.Errorf("failed to parse attempt, %s", err) + return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse attempt, %s", err) } attempt = &a } ioLower := strings.ToLower(matches[len(matches)-1]) - ioType, err := IOTypeString(ioLower) + ioType, err := ArtifactTypeString(ioLower) if err != nil { - return core.NodeExecutionIdentifier{}, nil, IOType_undefined, err + return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, err } - //switch matches[len(matches)-1] { - //case "i": - // ioType = I - //case "o": - // ioType = OUTPUT - //case "d": - // ioType = DECK - //} return core.NodeExecutionIdentifier{ NodeId: nodeID, @@ -76,11 +67,11 @@ func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, nodeExecutionID.ExecutionId.Domain, nodeExecutionID.ExecutionId.Name, nodeExecutionID.NodeId) res := &admin.FlyteURLs{ - Inputs: fmt.Sprintf("%s/%s", base, IOType_i), - Outputs: fmt.Sprintf("%s/%s", base, IOType_o), + Inputs: fmt.Sprintf("%s/%s", base, ArtifactTypeI), + Outputs: fmt.Sprintf("%s/%s", base, ArtifactTypeO), } if deck { - res.Deck = fmt.Sprintf("%s/%s", base, IOType_d) + res.Deck = fmt.Sprintf("%s/%s", base, ArtifactTypeD) } return res } @@ -90,11 +81,11 @@ func FlyteURLsFromTaskExecutionID(taskExecutionID core.TaskExecutionIdentifier, taskExecutionID.NodeExecutionId.ExecutionId.Domain, taskExecutionID.NodeExecutionId.ExecutionId.Name, taskExecutionID.NodeExecutionId.NodeId, strconv.Itoa(int(taskExecutionID.RetryAttempt))) res := &admin.FlyteURLs{ - Inputs: fmt.Sprintf("%s/%s", base, IOType_i), - Outputs: fmt.Sprintf("%s/%s", base, IOType_o), + Inputs: fmt.Sprintf("%s/%s", base, ArtifactTypeI), + Outputs: fmt.Sprintf("%s/%s", base, ArtifactTypeO), } if deck { - res.Deck = fmt.Sprintf("%s/%s", base, IOType_d) + res.Deck = fmt.Sprintf("%s/%s", base, ArtifactTypeD) } return res } diff --git a/pkg/common/iotype_enumer.go b/pkg/common/iotype_enumer.go deleted file mode 100644 index f68969a9d..000000000 --- a/pkg/common/iotype_enumer.go +++ /dev/null @@ -1,51 +0,0 @@ -// Code generated by "enumer --type=IOType --trimprefix=IOType_"; DO NOT EDIT. - -package common - -import ( - "fmt" -) - -const _IOTypeName = "undefinediod" - -var _IOTypeIndex = [...]uint8{0, 9, 10, 11, 12} - -func (i IOType) String() string { - if i < 0 || i >= IOType(len(_IOTypeIndex)-1) { - return fmt.Sprintf("IOType(%d)", i) - } - return _IOTypeName[_IOTypeIndex[i]:_IOTypeIndex[i+1]] -} - -var _IOTypeValues = []IOType{0, 1, 2, 3} - -var _IOTypeNameToValueMap = map[string]IOType{ - _IOTypeName[0:9]: 0, - _IOTypeName[9:10]: 1, - _IOTypeName[10:11]: 2, - _IOTypeName[11:12]: 3, -} - -// IOTypeString retrieves an enum value from the enum constants string name. -// Throws an error if the param is not part of the enum. -func IOTypeString(s string) (IOType, error) { - if val, ok := _IOTypeNameToValueMap[s]; ok { - return val, nil - } - return 0, fmt.Errorf("%s does not belong to IOType values", s) -} - -// IOTypeValues returns all values of the enum -func IOTypeValues() []IOType { - return _IOTypeValues -} - -// IsAIOType returns "true" if the value is listed in the enum definition. "false" otherwise -func (i IOType) IsAIOType() bool { - for _, v := range _IOTypeValues { - if i == v { - return true - } - } - return false -} From 8c138f0813c117f414260d8ee10ff71f09a81e44 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 1 May 2023 14:37:39 -0700 Subject: [PATCH 09/15] add unit tests Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 16 +- dataproxy/service_test.go | 117 ++++++++++-- pkg/common/flyte_url.go | 8 +- pkg/common/flyte_url_test.go | 166 ++++++++++++++++++ .../impl/node_execution_manager_test.go | 5 + .../impl/task_execution_manager_test.go | 5 + 6 files changed, 297 insertions(+), 20 deletions(-) create mode 100644 pkg/common/flyte_url_test.go diff --git a/dataproxy/service.go b/dataproxy/service.go index 4aea7e720..9327f3d5e 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -141,9 +141,17 @@ func (s Service) CreateDownloadLink(ctx context.Context, req *service.CreateDown return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create a signed url. Error: %v", err) } + u := []string{signedURLResp.URL.String()} + ts := timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())) + + // return &service.CreateDownloadLinkResponse{ - SignedUrl: []string{signedURLResp.URL.String()}, - ExpiresAt: timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())), + SignedUrl: u, + ExpiresAt: ts, + PreSignedUrls: &service.PreSignedURLs{ + SignedUrl: []string{signedURLResp.URL.String()}, + ExpiresAt: ts, + }, }, nil } @@ -302,8 +310,8 @@ func (s Service) GetData(ctx context.Context, req *service.GetDataRequest) ( return nil, err } return &service.GetDataResponse{ - Data: &service.GetDataResponse_FlyteDeckDownloadLink{ - FlyteDeckDownloadLink: resp, + Data: &service.GetDataResponse_PreSignedUrls{ + PreSignedUrls: resp.PreSignedUrls, }, }, nil } diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index 99b8bd9d9..87dcb938f 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -2,13 +2,11 @@ package dataproxy import ( "context" - "fmt" "testing" "time" - "github.com/flyteorg/flyteadmin/pkg/common" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/golang/protobuf/proto" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -169,16 +167,111 @@ func TestCreateDownloadLocation(t *testing.T) { }) } -func TestParseFlyteUrl(t *testing.T) { - t.Run("valid", func(t *testing.T) { - ne, attempt, kind, err := common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/o") - assert.NoError(t, err) - fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/i") +func TestService_GetData(t *testing.T) { + dataStore := commonMocks.GetMockStorageClient() + nodeExecutionManager := &mocks.MockNodeExecutionManager{} + taskExecutionManager := &mocks.MockTaskExecutionManager{} + s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager) + assert.NoError(t, err) + + inputsLM := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "input": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "hello", + }, + }, + }, + }, + }, + }, + }, + } + outputsLM := &core.LiteralMap{ + Literals: map[string]*core.Literal{ + "output": { + Value: &core.Literal_Scalar{ + Scalar: &core.Scalar{ + Value: &core.Scalar_Primitive{ + Primitive: &core.Primitive{ + Value: &core.Primitive_StringValue{ + StringValue: "world", + }, + }, + }, + }, + }, + }, + }, + } + + nodeExecutionManager.SetGetNodeExecutionDataFunc( + func(ctx context.Context, request admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) { + return &admin.NodeExecutionGetDataResponse{ + FullInputs: inputsLM, + FullOutputs: outputsLM, + }, nil + }, + ) + taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { + return &admin.TaskExecutionList{ + TaskExecutions: []*admin.TaskExecution{ + { + Id: &core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "proj", + Domain: "dev", + Name: "task", + Version: "v1", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "dev", + Name: "wfexecid", + }, + }, + RetryAttempt: 5, + }, + }, + }, + }, nil + }) + taskExecutionManager.SetGetTaskExecutionDataCallback(func(ctx context.Context, request admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { + return &admin.TaskExecutionGetDataResponse{ + FullInputs: inputsLM, + FullOutputs: outputsLM, + }, nil + }) + + t.Run("get a working set of urls without retry attempt", func(t *testing.T) { + res, err := s.GetData(context.Background(), &service.GetDataRequest{ + FlyteUrl: "flyte://v1/proj/dev/wfexecid/n0-d0/i", + }) assert.NoError(t, err) - fmt.Println(ne, attempt, kind, err) - ne, attempt, kind, err = common.ParseFlyteURL("flyte://v1/fs/dev/abc/n0/d") + assert.True(t, proto.Equal(inputsLM, res.GetLiteralMap())) + assert.Nil(t, res.GetPreSignedUrls()) + }) + + t.Run("get a working set of urls with a retry attempt", func(t *testing.T) { + res, err := s.GetData(context.Background(), &service.GetDataRequest{ + FlyteUrl: "flyte://v1/proj/dev/wfexecid/n0-d0/5/o", + }) assert.NoError(t, err) - fmt.Println(ne, attempt, kind, err) + assert.True(t, proto.Equal(outputsLM, res.GetLiteralMap())) + assert.Nil(t, res.GetPreSignedUrls()) + }) + + t.Run("Bad URL", func(t *testing.T) { + _, err = s.GetData(context.Background(), &service.GetDataRequest{ + FlyteUrl: "flyte://v3/blah/lorem/m0-fdj", + }) + assert.Error(t, err) }) } diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 2d9991136..34e3c62fc 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -18,12 +18,12 @@ type ArtifactType int // The suffixes in these constants are used to match against the tail end of the flyte url, to keep tne flyte url simpler const ( ArtifactTypeUndefined ArtifactType = iota - ArtifactTypeI - ArtifactTypeO - ArtifactTypeD + ArtifactTypeI // inputs + ArtifactTypeO // outputs + ArtifactTypeD // deck ) -var re = regexp.MustCompile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])") +var re = regexp.MustCompile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])$") func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, ArtifactType, error) { // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] diff --git a/pkg/common/flyte_url_test.go b/pkg/common/flyte_url_test.go new file mode 100644 index 000000000..57c2cb51a --- /dev/null +++ b/pkg/common/flyte_url_test.go @@ -0,0 +1,166 @@ +package common + +import ( + "testing" + + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" +) + +func TestParseFlyteUrl(t *testing.T) { + t.Run("valid", func(t *testing.T) { + ne, attempt, kind, err := ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/o") + assert.NoError(t, err) + assert.Equal(t, 0, *attempt) + assert.Equal(t, ArtifactTypeO, kind) + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, &ne)) + ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/i") + assert.NoError(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeI, kind) + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, &ne)) + + ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/d") + assert.NoError(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeD, kind) + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, &ne)) + + ne, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0-dn0-9-n0-n0/d") + assert.NoError(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeD, kind) + assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{ + NodeId: "n0-dn0-9-n0-n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, &ne)) + }) + + t.Run("invalid", func(t *testing.T) { + // more than one character + _, attempt, kind, err := ParseFlyteURL("flyte://v1/fs/dev/abc/n0/0/od") + assert.Error(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeUndefined, kind) + + _, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/abc/n0/input") + assert.Error(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeUndefined, kind) + + // non integer for attempt + _, attempt, kind, err = ParseFlyteURL("flyte://v1/fs/dev/ab/n0/a/i") + assert.Error(t, err) + assert.Nil(t, attempt) + assert.Equal(t, ArtifactTypeUndefined, kind) + }) +} + +func TestFlyteURLsFromNodeExecutionID(t *testing.T) { + t.Run("with deck", func(t *testing.T) { + ne := core.NodeExecutionIdentifier{ + NodeId: "n0-dn0-n1", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + } + urls := FlyteURLsFromNodeExecutionID(ne, true) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0-dn0-n1/i", urls.GetInputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0-dn0-n1/o", urls.GetOutputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0-dn0-n1/d", urls.GetDeck()) + }) + + t.Run("without deck", func(t *testing.T) { + ne := core.NodeExecutionIdentifier{ + NodeId: "n0-dn0-n1", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + } + urls := FlyteURLsFromNodeExecutionID(ne, false) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0-dn0-n1/i", urls.GetInputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0-dn0-n1/o", urls.GetOutputs()) + assert.Equal(t, "", urls.GetDeck()) + }) +} + +func TestFlyteURLsFromTaskExecutionID(t *testing.T) { + t.Run("with deck", func(t *testing.T) { + te := core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "fs", + Domain: "dev", + Name: "abc", + Version: "v1", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, + RetryAttempt: 1, + } + urls := FlyteURLsFromTaskExecutionID(te, true) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0/1/i", urls.GetInputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0/1/o", urls.GetOutputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0/1/d", urls.GetDeck()) + }) + + t.Run("without deck", func(t *testing.T) { + te := core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Project: "fs", + Domain: "dev", + Name: "abc", + Version: "v1", + }, + NodeExecutionId: &core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "fs", + Domain: "dev", + Name: "abc", + }, + }, + } + urls := FlyteURLsFromTaskExecutionID(te, false) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0/0/i", urls.GetInputs()) + assert.Equal(t, "flyte://v1/fs/dev/abc/n0/0/o", urls.GetOutputs()) + assert.Equal(t, "", urls.GetDeck()) + }) +} diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index c6b4edb27..53c928583 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -1311,5 +1311,10 @@ func TestGetNodeExecutionData(t *testing.T) { Id: dynamicWorkflowClosure.Primary.Template.Id, CompiledWorkflow: &dynamicWorkflowClosure, }, + FlyteUrls: &admin.FlyteURLs{ + Inputs: "flyte://v1/project/domain/name/node id/i", + Outputs: "flyte://v1/project/domain/name/node id/o", + Deck: "flyte://v1/project/domain/name/node id/d", + }, }, dataResponse)) } diff --git a/pkg/manager/impl/task_execution_manager_test.go b/pkg/manager/impl/task_execution_manager_test.go index cc59012bb..4c190d0bf 100644 --- a/pkg/manager/impl/task_execution_manager_test.go +++ b/pkg/manager/impl/task_execution_manager_test.go @@ -958,5 +958,10 @@ func TestGetTaskExecutionData(t *testing.T) { }, FullInputs: fullInputs, FullOutputs: fullOutputs, + FlyteUrls: &admin.FlyteURLs{ + Inputs: "flyte://v1/project/domain/name/node-id/1/i", + Outputs: "flyte://v1/project/domain/name/node-id/1/o", + Deck: "", + }, }, dataResponse)) } From 60ad5424b11545eee4ede1c323f233d88efc72ff Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 1 May 2023 18:24:00 -0700 Subject: [PATCH 10/15] use released flyteidl Signed-off-by: Yee Hing Tong --- go.mod | 3 ++- go.sum | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 93b2855f6..02e1cd52a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cloudevents/sdk-go/v2 v2.8.0 github.com/coreos/go-oidc v2.2.1+incompatible github.com/evanphx/json-patch v4.12.0+incompatible - github.com/flyteorg/flyteidl v1.3.13 + github.com/flyteorg/flyteidl v1.5.0 github.com/flyteorg/flyteplugins v1.0.40 github.com/flyteorg/flytepropeller v1.1.70 github.com/flyteorg/flytestdlib v1.0.15 @@ -209,4 +209,5 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 + replace github.com/flyteorg/flyteidl => ../flyteidl diff --git a/go.sum b/go.sum index 638d2b84d..a69aa6bcf 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,6 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.3.13 h1:jOjiHl6jmSCOGC094QaRdSjjhThhzYPm0jHSxwAZ6UM= -github.com/flyteorg/flyteidl v1.3.13/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= From b8a6b5122c0033086a3d9e53b120755bfdc96395 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Mon, 1 May 2023 19:09:27 -0700 Subject: [PATCH 11/15] remove replace Signed-off-by: Yee Hing Tong --- go.mod | 2 -- go.sum | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 02e1cd52a..6d78ffdd1 100644 --- a/go.mod +++ b/go.mod @@ -209,5 +209,3 @@ require ( ) replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.20210825070134-bfc34418fe84 - -replace github.com/flyteorg/flyteidl => ../flyteidl diff --git a/go.sum b/go.sum index a69aa6bcf..8e166ce64 100644 --- a/go.sum +++ b/go.sum @@ -312,6 +312,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/flyteorg/flyteidl v1.5.0 h1:vdaA5Cg9eqi5NMuASSod/AE7RXlHvzdWjSL9abDyd/M= +github.com/flyteorg/flyteidl v1.5.0/go.mod h1:ckLjB51moX4L0oQml+WTCrPK50zrJf6IZJ6LPC0RB4I= github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s= github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio= github.com/flyteorg/flytepropeller v1.1.70 h1:/d1qqz13rdVADM85ST70eerAdBstJJz9UUB/mNSZi0w= From 81e7deb83e1cf9407b93c0c32dff6b441f803e30 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Wed, 3 May 2023 17:40:57 -0700 Subject: [PATCH 12/15] Update dataproxy/service.go Co-authored-by: Haytham Abuelfutuh Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataproxy/service.go b/dataproxy/service.go index 9327f3d5e..89db6dc62 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -248,7 +248,7 @@ func createStorageLocation(ctx context.Context, store *storage.DataStore, } func (s Service) validateResolveArtifactRequest(req *service.GetDataRequest) error { - if req.GetFlyteUrl() == "" { + if len(req.GetFlyteUrl()) == 0 { return fmt.Errorf("source is required. Provided empty string") } if !strings.HasPrefix(req.GetFlyteUrl(), "flyte://") { From 5533ae83de08e8e2b0c6ea0ea37a8ba3c516a5c6 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 5 May 2023 12:52:10 -0700 Subject: [PATCH 13/15] pr comments Signed-off-by: Yee Hing Tong --- dataproxy/service.go | 5 +++- dataproxy/service_test.go | 47 ++++++++++++++++++++++++++++++++++-- pkg/common/flyte_url.go | 37 ++++++++++++++++------------ pkg/common/flyte_url_test.go | 8 ++++++ 4 files changed, 79 insertions(+), 18 deletions(-) diff --git a/dataproxy/service.go b/dataproxy/service.go index 89db6dc62..5ab9e32a8 100644 --- a/dataproxy/service.go +++ b/dataproxy/service.go @@ -264,9 +264,12 @@ func (s Service) GetTaskExecutionID(ctx context.Context, attempt int, nodeExecID Limit: 1, Filters: fmt.Sprintf("eq(retry_attempt,%s)", strconv.Itoa(attempt)), }) - if err != nil || len(taskExecs.TaskExecutions) == 0 { + if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "failed to list task executions [%v]. Error: %v", nodeExecID, err) } + if len(taskExecs.TaskExecutions) == 0 { + return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "no task executions were listed [%v]. Error: %v", nodeExecID, err) + } taskExec := taskExecs.TaskExecutions[0] return taskExec.Id, nil } diff --git a/dataproxy/service_test.go b/dataproxy/service_test.go index 87dcb938f..db1c0e61d 100644 --- a/dataproxy/service_test.go +++ b/dataproxy/service_test.go @@ -15,10 +15,10 @@ import ( commonMocks "github.com/flyteorg/flyteadmin/pkg/common/mocks" stdlibConfig "github.com/flyteorg/flytestdlib/config" - "google.golang.org/protobuf/types/known/durationpb" - + "github.com/flyteorg/flyteadmin/pkg/errors" "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/promutils/labeled" + "google.golang.org/protobuf/types/known/durationpb" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" @@ -275,3 +275,46 @@ func TestService_GetData(t *testing.T) { assert.Error(t, err) }) } + +func TestService_Error(t *testing.T) { + dataStore := commonMocks.GetMockStorageClient() + nodeExecutionManager := &mocks.MockNodeExecutionManager{} + taskExecutionManager := &mocks.MockTaskExecutionManager{} + s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager) + assert.NoError(t, err) + + t.Run("get a working set of urls without retry attempt", func(t *testing.T) { + taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { + return nil, errors.NewFlyteAdminErrorf(1, "not found") + }) + nodeExecID := core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "dev", + Name: "wfexecid", + }, + } + _, err := s.GetTaskExecutionID(context.Background(), 0, nodeExecID) + assert.Error(t, err, "failed to list") + }) + + t.Run("get a working set of urls without retry attempt", func(t *testing.T) { + taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) { + return &admin.TaskExecutionList{ + TaskExecutions: nil, + Token: "", + }, nil + }) + nodeExecID := core.NodeExecutionIdentifier{ + NodeId: "n0", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "proj", + Domain: "dev", + Name: "wfexecid", + }, + } + _, err := s.GetTaskExecutionID(context.Background(), 0, nodeExecID) + assert.Error(t, err, "no task executions") + }) +} diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 34e3c62fc..ec9a3ca08 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -4,7 +4,6 @@ import ( "fmt" "regexp" "strconv" - "strings" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -23,31 +22,39 @@ const ( ArtifactTypeD // deck ) -var re = regexp.MustCompile("flyte://v1/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)/([a-zA-Z0-9_-]+)(?:/([0-9]+))?/([iod])$") +var re = regexp.MustCompile("flyte://v1/(?P[a-zA-Z0-9_-]+)/(?P[a-zA-Z0-9_-]+)/(?P[a-zA-Z0-9_-]+)/(?P[a-zA-Z0-9_-]+)(?:/(?P[0-9]+))?/(?P[iod])$") + +func MatchRegex(reg *regexp.Regexp, input string) map[string]string { + names := reg.SubexpNames() + res := reg.FindAllStringSubmatch(input, -1) + if len(res) == 0 { + return nil + } + dict := make(map[string]string, len(names)) + for i := 1; i < len(res[0]); i++ { + dict[names[i]] = res[0][i] + } + return dict +} func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, ArtifactType, error) { // flyteURL is of the form flyte://v1/project/domain/execution_id/node_id/attempt/[iod] // where i stands for inputs.pb o for outputs.pb and d for the flyte deck // If the retry attempt is missing, the io requested is assumed to be for the node instead of the task execution - re.MatchString(flyteURL) - matches := re.FindStringSubmatch(flyteURL) - if len(matches) != 7 && len(matches) != 6 { - return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse flyte url, only %d matches found", len(matches)) - } - proj := matches[1] - domain := matches[2] - executionID := matches[3] - nodeID := matches[4] + matches := MatchRegex(re, flyteURL) + proj := matches["project"] + domain := matches["domain"] + executionID := matches["exec"] + nodeID := matches["node"] var attempt *int // nil means node execution, not a task execution - if len(matches) == 7 && matches[5] != "" { - a, err := strconv.Atoi(matches[5]) + if matches["attempt"] != "" { + a, err := strconv.Atoi(matches["attempt"]) if err != nil { return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse attempt, %s", err) } attempt = &a } - ioLower := strings.ToLower(matches[len(matches)-1]) - ioType, err := ArtifactTypeString(ioLower) + ioType, err := ArtifactTypeString(matches["artifactType"]) if err != nil { return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, err } diff --git a/pkg/common/flyte_url_test.go b/pkg/common/flyte_url_test.go index 57c2cb51a..378860cf1 100644 --- a/pkg/common/flyte_url_test.go +++ b/pkg/common/flyte_url_test.go @@ -164,3 +164,11 @@ func TestFlyteURLsFromTaskExecutionID(t *testing.T) { assert.Equal(t, "", urls.GetDeck()) }) } + +func TestMatchRegexDirectly(t *testing.T) { + result := MatchRegex(re, "flyte://v1/fs/dev/abc/n0-dn0-9-n0-n0/i") + assert.Equal(t, "", result["attempt"]) + + result = MatchRegex(re, "flyteff://v2/fs/dfdsaev/abc/n0-dn0-9-n0-n0/i") + assert.Nil(t, result) +} From 19b744388a785e99ae1e8f99a25489002f24e632 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 5 May 2023 14:20:40 -0700 Subject: [PATCH 14/15] Update pkg/common/flyte_url.go Co-authored-by: Haytham Abuelfutuh Signed-off-by: Yee Hing Tong --- pkg/common/flyte_url.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index ec9a3ca08..9801994d4 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -47,10 +47,10 @@ func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, Artifac executionID := matches["exec"] nodeID := matches["node"] var attempt *int // nil means node execution, not a task execution - if matches["attempt"] != "" { - a, err := strconv.Atoi(matches["attempt"]) + if attempt := matches["attempt"]; len(attempt) > 0 { + a, err := strconv.Atoi(attempt) if err != nil { - return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse attempt, %s", err) + return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse attempt [%v], %v", attempt, err) } attempt = &a } From 9f919a2f232d065708f23f6db128e90721ae69bd Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 5 May 2023 14:24:19 -0700 Subject: [PATCH 15/15] change var name Signed-off-by: Yee Hing Tong --- pkg/common/flyte_url.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/common/flyte_url.go b/pkg/common/flyte_url.go index 9801994d4..b3094d586 100644 --- a/pkg/common/flyte_url.go +++ b/pkg/common/flyte_url.go @@ -46,13 +46,13 @@ func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, Artifac domain := matches["domain"] executionID := matches["exec"] nodeID := matches["node"] - var attempt *int // nil means node execution, not a task execution + var attemptPtr *int // nil means node execution, not a task execution if attempt := matches["attempt"]; len(attempt) > 0 { a, err := strconv.Atoi(attempt) if err != nil { return core.NodeExecutionIdentifier{}, nil, ArtifactTypeUndefined, fmt.Errorf("failed to parse attempt [%v], %v", attempt, err) } - attempt = &a + attemptPtr = &a } ioType, err := ArtifactTypeString(matches["artifactType"]) if err != nil { @@ -66,7 +66,7 @@ func ParseFlyteURL(flyteURL string) (core.NodeExecutionIdentifier, *int, Artifac Domain: domain, Name: executionID, }, - }, attempt, ioType, nil + }, attemptPtr, ioType, nil } func FlyteURLsFromNodeExecutionID(nodeExecutionID core.NodeExecutionIdentifier, deck bool) *admin.FlyteURLs {