Skip to content

Commit

Permalink
Reporting more complete metadata for map task subtasks (flyteorg#417)
Browse files Browse the repository at this point in the history
* fixed potential nil pointer

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* added CacheStatus and Logs fields to ExternalResourceInfo proto

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins version

Signed-off-by: Daniel Rammer <[email protected]>

* bumping flyteplugins version

Signed-off-by: Daniel Rammer <[email protected]>

* setting TaskExecutionEvent version to 1

Signed-off-by: Daniel Rammer <[email protected]>

* bumped flyteidl and flyteplugins version

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 8, 2022
1 parent f475c6c commit c4553bb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
4 changes: 2 additions & 2 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.24.9
github.com/flyteorg/flyteplugins v0.10.16
github.com/flyteorg/flyteidl v0.24.17
github.com/flyteorg/flyteplugins v0.10.19
github.com/flyteorg/flytestdlib v0.4.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
10 changes: 5 additions & 5 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,11 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.10.0 h1:s36xzo75JdqLaaWoiEHk767eHiwo0598uUxyfiPkDsg=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.9 h1:wmZ/JEiCQ8cR2mkpFsvwwoUdz+g9GotoifBjLqXh7QY=
github.com/flyteorg/flyteidl v0.24.9/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
github.com/flyteorg/flyteplugins v0.10.16/go.mod h1:YBWV8QnFakDJfLyua8pYddiWqszAqseBKIJPNMERlos=
github.com/flyteorg/flyteidl v0.24.7/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.17 h1:Xx70bJbuQGyvS8uAyU4AN74rot6KnzJ9r/L9gcCdEsU=
github.com/flyteorg/flyteidl v0.24.17/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.19 h1:9fY3aYXfjVR8jyb4omdWu9RW2FwcmAnld9PHnR0BLW8=
github.com/flyteorg/flyteplugins v0.10.19/go.mod h1:C2va2hfD7mBi24dXRhBi0GIKG4dzFhSR27GsCCFDzss=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.13 h1:TzgqhECRGfOHYH1A7rUwcKEEH2rTtPxGy+oYcif7iBw=
github.com/flyteorg/flytestdlib v0.4.13/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
Expand Down
11 changes: 9 additions & 2 deletions flytepropeller/pkg/controller/nodes/task/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
)

// This is used by flyteadmin to indicate that map tasks now report subtask metadata individually.
var taskExecutionEventVersion = int32(1)

func ToTransitionType(ttype pluginCore.TransitionType) handler.TransitionType {
if ttype == pluginCore.TransitionTypeBarrier {
return handler.TransitionTypeBarrier
Expand Down Expand Up @@ -106,13 +109,16 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio
ResourcePoolInfo: input.ResourcePoolInfo,
}

externalResources := input.Info.Info().ExternalResources
if externalResources != nil {
if input.Info.Info() != nil && input.Info.Info().ExternalResources != nil {
externalResources := input.Info.Info().ExternalResources

metadata.ExternalResources = make([]*event.ExternalResourceInfo, len(externalResources))
for idx, e := range input.Info.Info().ExternalResources {
metadata.ExternalResources[idx] = &event.ExternalResourceInfo{
ExternalId: e.ExternalID,
CacheStatus: e.CacheStatus,
Index: e.Index,
Logs: e.Logs,
RetryAttempt: e.RetryAttempt,
Phase: ToTaskEventPhase(e.Phase),
}
Expand All @@ -131,6 +137,7 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio
TaskType: input.TaskType,
Reason: input.Info.Reason(),
Metadata: metadata,
EventVersion: taskExecutionEventVersion,
}

if input.Info.Phase().IsSuccess() && input.OutputWriter != nil {
Expand Down

0 comments on commit c4553bb

Please sign in to comment.