Skip to content

Commit

Permalink
Record TaskExecutionMetadata from task events (flyteorg#375)
Browse files Browse the repository at this point in the history
* adding TaskExecutionMetadata to closure on creation

Signed-off-by: Daniel Rammer <[email protected]>

* added TaskExecutionMetadata merging functionality

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updating each individual ExternalResource

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* updated ExternalResource index comment

Signed-off-by: Daniel Rammer <[email protected]>

* fixed typo on testing log length

Signed-off-by: Daniel Rammer <[email protected]>

* updated ExternalResourceInfo storage to use binary search over Index and RetryAttempt

Signed-off-by: Daniel Rammer <[email protected]>

* fixed sorting initial ExternalResources

Signed-off-by: Daniel Rammer <[email protected]>

* actually fixed it now

Signed-off-by: Daniel Rammer <[email protected]>

* updating documentation

Signed-off-by: Daniel Rammer <[email protected]>

* fixed tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issue

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 1, 2022
1 parent 64da9d3 commit b9c0da1
Show file tree
Hide file tree
Showing 3 changed files with 440 additions and 3 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,6 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.24.10-0.20220331230832-978d1aea7412 h1:njI9nBHdLOa9Ndueg8GGDAiYtft4ri8HmesgdXoZpO4=
github.com/flyteorg/flyteidl v0.24.10-0.20220331230832-978d1aea7412/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteidl v0.24.10 h1:fCYpfp5fxKbhRMSkP0Hdw5lOPBTItLU1A3WA1Lc7sEU=
github.com/flyteorg/flyteidl v0.24.10/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww=
github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw=
Expand Down
102 changes: 101 additions & 1 deletion pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transformers

import (
"context"
"sort"
"strconv"

"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/flyteorg/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
Expand Down Expand Up @@ -122,6 +124,18 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
InputURI: input.Request.Event.InputUri,
}

metadata := input.Request.Event.Metadata
if metadata != nil && len(metadata.ExternalResources) > 1 {
sort.Slice(metadata.ExternalResources, func(i, j int) bool {
a := metadata.ExternalResources[i]
b := metadata.ExternalResources[j]
if a.GetIndex() == b.GetIndex() {
return a.GetRetryAttempt() < b.GetRetryAttempt()
}
return a.GetIndex() < b.GetIndex()
})
}

closure := &admin.TaskExecutionClosure{
Phase: input.Request.Event.Phase,
UpdatedAt: input.Request.Event.OccurredAt,
Expand All @@ -130,6 +144,7 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode
CustomInfo: input.Request.Event.CustomInfo,
Reason: input.Request.Event.Reason,
TaskType: input.Request.Event.TaskType,
Metadata: metadata,
}

eventPhase := input.Request.Event.Phase
Expand Down Expand Up @@ -240,6 +255,90 @@ func mergeCustom(existing, latest *_struct.Struct) (*_struct.Struct, error) {
return &response, nil
}

// mergeExternalResource combines the lastest ExternalResourceInfo proto with an existing instance
// by updating fields and merging logs.
func mergeExternalResource(existing, latest *event.ExternalResourceInfo) *event.ExternalResourceInfo {
if existing == nil {
return latest
}

if latest == nil {
return existing
}

if latest.ExternalId != "" && existing.ExternalId != latest.ExternalId {
existing.ExternalId = latest.ExternalId
}
// note we are not updating existing.Index and existing.RetryAttempt because they are the
// search key for our ExternalResource pool.
existing.Phase = latest.Phase
if latest.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED && existing.CacheStatus != latest.CacheStatus {
existing.CacheStatus = latest.CacheStatus
}
existing.Logs = mergeLogs(existing.Logs, latest.Logs)

return existing
}

// mergeExternalResources combines lists of external resources. This involves appending new
// resources and updating in-place resources attributes.
func mergeExternalResources(existing, latest []*event.ExternalResourceInfo) []*event.ExternalResourceInfo {
if len(latest) == 0 {
return existing
}

for _, externalResource := range latest {
// we use a binary search over the ExternalResource Index and RetryAttempt fields to
// determine if a new subtask is being reported or an existing is being updated. it is
// important to note that this means anytime more than one ExternalResource is reported
// they must set the Index field.
index := sort.Search(len(existing), func(i int) bool {
if existing[i].GetIndex() == externalResource.GetIndex() {
return existing[i].GetRetryAttempt() >= externalResource.GetRetryAttempt()
}
return existing[i].GetIndex() >= externalResource.GetIndex()
})

if index >= len(existing) {
existing = append(existing, externalResource)
} else if existing[index].GetIndex() == externalResource.GetIndex() && existing[index].GetRetryAttempt() == externalResource.GetRetryAttempt() {
existing[index] = mergeExternalResource(existing[index], externalResource)
} else {
existing = append(existing, &event.ExternalResourceInfo{})
copy(existing[index+1:], existing[index:])
existing[index] = externalResource
}
}

return existing
}

// mergeMetadata merges an existing TaskExecutionMetadata instance with the provided instance. This
// includes updating non-defaulted fields and merging ExternalResources.
func mergeMetadata(existing, latest *event.TaskExecutionMetadata) *event.TaskExecutionMetadata {
if existing == nil {
return latest
}

if latest == nil {
return existing
}

if latest.GeneratedName != "" && existing.GeneratedName != latest.GeneratedName {
existing.GeneratedName = latest.GeneratedName
}
existing.ExternalResources = mergeExternalResources(existing.ExternalResources, latest.ExternalResources)
existing.ResourcePoolInfo = latest.ResourcePoolInfo
if latest.PluginIdentifier != "" && existing.PluginIdentifier != latest.PluginIdentifier {
existing.PluginIdentifier = latest.PluginIdentifier
}
if latest.InstanceClass != event.TaskExecutionMetadata_DEFAULT && existing.InstanceClass != latest.InstanceClass {
existing.InstanceClass = latest.InstanceClass
}

return existing
}

func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
var taskExecutionClosure admin.TaskExecutionClosure
Expand Down Expand Up @@ -272,8 +371,9 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
}
taskExecutionClosure.CustomInfo, err = mergeCustom(taskExecutionClosure.CustomInfo, request.Event.CustomInfo)
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task even custom_info with error: %v", err)
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task event custom_info with error: %v", err)
}
taskExecutionClosure.Metadata = mergeMetadata(taskExecutionClosure.Metadata, request.Event.Metadata)
marshaledClosure, err := proto.Marshal(&taskExecutionClosure)
if err != nil {
return errors.NewFlyteAdminErrorf(
Expand Down
Loading

0 comments on commit b9c0da1

Please sign in to comment.