diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 57b51019d1..dc5028e081 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -359,8 +359,6 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flyteorg/flyteidl v0.23.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.24.0/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= -github.com/flyteorg/flyteidl v0.24.10-0.20220331230832-978d1aea7412 h1:njI9nBHdLOa9Ndueg8GGDAiYtft4ri8HmesgdXoZpO4= -github.com/flyteorg/flyteidl v0.24.10-0.20220331230832-978d1aea7412/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= github.com/flyteorg/flyteidl v0.24.10 h1:fCYpfp5fxKbhRMSkP0Hdw5lOPBTItLU1A3WA1Lc7sEU= github.com/flyteorg/flyteidl v0.24.10/go.mod h1:vHSugApgS3hRITIafzQDU8DZD/W8wFRfFcgaFU35Dww= github.com/flyteorg/flyteplugins v0.10.16 h1:rwNI2MACPbcST2O6CEUsNW6bccz7ZLni0GiY3orevfw= diff --git a/flyteadmin/pkg/repositories/transformers/task_execution.go b/flyteadmin/pkg/repositories/transformers/task_execution.go index d2427fad20..c46c858e25 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution.go @@ -2,6 +2,7 @@ package transformers import ( "context" + "sort" "strconv" "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces" @@ -15,6 +16,7 @@ import ( "github.com/flyteorg/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flytestdlib/logger" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" @@ -122,6 +124,18 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode InputURI: input.Request.Event.InputUri, } + metadata := input.Request.Event.Metadata + if metadata != nil && len(metadata.ExternalResources) > 1 { + sort.Slice(metadata.ExternalResources, func(i, j int) bool { + a := metadata.ExternalResources[i] + b := metadata.ExternalResources[j] + if a.GetIndex() == b.GetIndex() { + return a.GetRetryAttempt() < b.GetRetryAttempt() + } + return a.GetIndex() < b.GetIndex() + }) + } + closure := &admin.TaskExecutionClosure{ Phase: input.Request.Event.Phase, UpdatedAt: input.Request.Event.OccurredAt, @@ -130,6 +144,7 @@ func CreateTaskExecutionModel(ctx context.Context, input CreateTaskExecutionMode CustomInfo: input.Request.Event.CustomInfo, Reason: input.Request.Event.Reason, TaskType: input.Request.Event.TaskType, + Metadata: metadata, } eventPhase := input.Request.Event.Phase @@ -240,6 +255,90 @@ func mergeCustom(existing, latest *_struct.Struct) (*_struct.Struct, error) { return &response, nil } +// mergeExternalResource combines the lastest ExternalResourceInfo proto with an existing instance +// by updating fields and merging logs. +func mergeExternalResource(existing, latest *event.ExternalResourceInfo) *event.ExternalResourceInfo { + if existing == nil { + return latest + } + + if latest == nil { + return existing + } + + if latest.ExternalId != "" && existing.ExternalId != latest.ExternalId { + existing.ExternalId = latest.ExternalId + } + // note we are not updating existing.Index and existing.RetryAttempt because they are the + // search key for our ExternalResource pool. + existing.Phase = latest.Phase + if latest.CacheStatus != core.CatalogCacheStatus_CACHE_DISABLED && existing.CacheStatus != latest.CacheStatus { + existing.CacheStatus = latest.CacheStatus + } + existing.Logs = mergeLogs(existing.Logs, latest.Logs) + + return existing +} + +// mergeExternalResources combines lists of external resources. This involves appending new +// resources and updating in-place resources attributes. +func mergeExternalResources(existing, latest []*event.ExternalResourceInfo) []*event.ExternalResourceInfo { + if len(latest) == 0 { + return existing + } + + for _, externalResource := range latest { + // we use a binary search over the ExternalResource Index and RetryAttempt fields to + // determine if a new subtask is being reported or an existing is being updated. it is + // important to note that this means anytime more than one ExternalResource is reported + // they must set the Index field. + index := sort.Search(len(existing), func(i int) bool { + if existing[i].GetIndex() == externalResource.GetIndex() { + return existing[i].GetRetryAttempt() >= externalResource.GetRetryAttempt() + } + return existing[i].GetIndex() >= externalResource.GetIndex() + }) + + if index >= len(existing) { + existing = append(existing, externalResource) + } else if existing[index].GetIndex() == externalResource.GetIndex() && existing[index].GetRetryAttempt() == externalResource.GetRetryAttempt() { + existing[index] = mergeExternalResource(existing[index], externalResource) + } else { + existing = append(existing, &event.ExternalResourceInfo{}) + copy(existing[index+1:], existing[index:]) + existing[index] = externalResource + } + } + + return existing +} + +// mergeMetadata merges an existing TaskExecutionMetadata instance with the provided instance. This +// includes updating non-defaulted fields and merging ExternalResources. +func mergeMetadata(existing, latest *event.TaskExecutionMetadata) *event.TaskExecutionMetadata { + if existing == nil { + return latest + } + + if latest == nil { + return existing + } + + if latest.GeneratedName != "" && existing.GeneratedName != latest.GeneratedName { + existing.GeneratedName = latest.GeneratedName + } + existing.ExternalResources = mergeExternalResources(existing.ExternalResources, latest.ExternalResources) + existing.ResourcePoolInfo = latest.ResourcePoolInfo + if latest.PluginIdentifier != "" && existing.PluginIdentifier != latest.PluginIdentifier { + existing.PluginIdentifier = latest.PluginIdentifier + } + if latest.InstanceClass != event.TaskExecutionMetadata_DEFAULT && existing.InstanceClass != latest.InstanceClass { + existing.InstanceClass = latest.InstanceClass + } + + return existing +} + func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution, inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error { var taskExecutionClosure admin.TaskExecutionClosure @@ -272,8 +371,9 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE } taskExecutionClosure.CustomInfo, err = mergeCustom(taskExecutionClosure.CustomInfo, request.Event.CustomInfo) if err != nil { - return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task even custom_info with error: %v", err) + return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task event custom_info with error: %v", err) } + taskExecutionClosure.Metadata = mergeMetadata(taskExecutionClosure.Metadata, request.Event.Metadata) marshaledClosure, err := proto.Marshal(&taskExecutionClosure) if err != nil { return errors.NewFlyteAdminErrorf( diff --git a/flyteadmin/pkg/repositories/transformers/task_execution_test.go b/flyteadmin/pkg/repositories/transformers/task_execution_test.go index 5d5b982a40..03647ecec5 100644 --- a/flyteadmin/pkg/repositories/transformers/task_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/task_execution_test.go @@ -843,3 +843,342 @@ func TestMergeCustoms(t *testing.T) { }) } + +func TestMergeExternalResource(t *testing.T) { + type testCase struct { + existing *event.ExternalResourceInfo + latest *event.ExternalResourceInfo + expected *event.ExternalResourceInfo + name string + } + + testCases := []testCase{ + { + existing: nil, + latest: nil, + expected: nil, + name: "do nothing", + }, + { + existing: &event.ExternalResourceInfo{ + ExternalId: "foo", + }, + latest: nil, + expected: &event.ExternalResourceInfo{ + ExternalId: "foo", + }, + name: "use existing", + }, + { + existing: nil, + latest: &event.ExternalResourceInfo{ + ExternalId: "foo", + }, + expected: &event.ExternalResourceInfo{ + ExternalId: "foo", + }, + name: "use latest", + }, + { + existing: &event.ExternalResourceInfo{ + ExternalId: "foo", + CacheStatus: core.CatalogCacheStatus_CACHE_HIT, + RetryAttempt: 1, + Phase: core.TaskExecution_RUNNING, + }, + latest: &event.ExternalResourceInfo{ + Phase: core.TaskExecution_SUCCEEDED, + }, + expected: &event.ExternalResourceInfo{ + ExternalId: "foo", + CacheStatus: core.CatalogCacheStatus_CACHE_HIT, + RetryAttempt: 1, + Phase: core.TaskExecution_SUCCEEDED, + }, + name: "update phase", + }, + { + existing: &event.ExternalResourceInfo{}, + latest: &event.ExternalResourceInfo{ + ExternalId: "foo", + CacheStatus: core.CatalogCacheStatus_CACHE_HIT, + Phase: core.TaskExecution_RUNNING, + }, + expected: &event.ExternalResourceInfo{ + ExternalId: "foo", + CacheStatus: core.CatalogCacheStatus_CACHE_HIT, + Phase: core.TaskExecution_RUNNING, + }, + name: "update everything", + }, + } + + for _, mergeTestCase := range testCases { + t.Run(mergeTestCase.name, func(t *testing.T) { + actual := mergeExternalResource(mergeTestCase.existing, mergeTestCase.latest) + assert.True(t, proto.Equal(mergeTestCase.expected, actual)) + }) + } +} + +func TestMergeExternalResources(t *testing.T) { + type testCase struct { + existing []*event.ExternalResourceInfo + latest []*event.ExternalResourceInfo + expected []*event.ExternalResourceInfo + name string + } + + testCases := []testCase{ + { + existing: nil, + latest: nil, + expected: nil, + name: "do nothing", + }, + { + existing: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 1, + }, + }, + latest: nil, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 1, + }, + }, + name: "use existing", + }, + { + existing: nil, + latest: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 1, + }, + }, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 1, + }, + }, + name: "use latest", + }, + { + existing: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + }, + latest: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "bar", + Index: 0, + }, + }, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "bar", + Index: 0, + }, + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + }, + name: "add subtask before", + }, + { + existing: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + }, + latest: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 2, + }, + }, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 2, + }, + }, + name: "add subtask after", + }, + { + existing: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + }, + latest: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + RetryAttempt: 1, + }, + }, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + }, + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 1, + RetryAttempt: 1, + }, + }, + name: "add subtask retry", + }, + { + existing: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 0, + RetryAttempt: 0, + Phase: core.TaskExecution_UNDEFINED, + }, + &event.ExternalResourceInfo{ + ExternalId: "bar", + Index: 1, + RetryAttempt: 0, + Phase: core.TaskExecution_UNDEFINED, + }, + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 2, + RetryAttempt: 0, + Phase: core.TaskExecution_UNDEFINED, + }, + }, + latest: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 2, + RetryAttempt: 0, + Phase: core.TaskExecution_RUNNING, + }, + }, + expected: []*event.ExternalResourceInfo{ + &event.ExternalResourceInfo{ + ExternalId: "foo", + Index: 0, + RetryAttempt: 0, + Phase: core.TaskExecution_UNDEFINED, + }, + &event.ExternalResourceInfo{ + ExternalId: "bar", + Index: 1, + RetryAttempt: 0, + Phase: core.TaskExecution_UNDEFINED, + }, + &event.ExternalResourceInfo{ + ExternalId: "baz", + Index: 2, + RetryAttempt: 0, + Phase: core.TaskExecution_RUNNING, + }, + }, + name: "update subtask", + }, + } + + for _, mergeTestCase := range testCases { + t.Run(mergeTestCase.name, func(t *testing.T) { + actual := mergeExternalResources(mergeTestCase.existing, mergeTestCase.latest) + assert.Equal(t, len(mergeTestCase.expected), len(actual)) + for idx, expectedExternalResource := range mergeTestCase.expected { + assert.True(t, proto.Equal(expectedExternalResource, actual[idx])) + } + }) + } +} + +func TestMergeMetadata(t *testing.T) { + type testCase struct { + existing *event.TaskExecutionMetadata + latest *event.TaskExecutionMetadata + expected *event.TaskExecutionMetadata + name string + } + + testCases := []testCase{ + { + existing: nil, + latest: nil, + expected: nil, + name: "do nothing", + }, + { + existing: &event.TaskExecutionMetadata{}, + latest: nil, + expected: &event.TaskExecutionMetadata{}, + name: "use existing", + }, + { + existing: nil, + latest: &event.TaskExecutionMetadata{}, + expected: &event.TaskExecutionMetadata{}, + name: "use latest", + }, + { + existing: &event.TaskExecutionMetadata{ + GeneratedName: "foo", + ResourcePoolInfo: []*event.ResourcePoolInfo{}, + PluginIdentifier: "bar", + InstanceClass: 1, + }, + latest: &event.TaskExecutionMetadata{}, + expected: &event.TaskExecutionMetadata{ + GeneratedName: "foo", + ResourcePoolInfo: []*event.ResourcePoolInfo{}, + PluginIdentifier: "bar", + InstanceClass: 1, + }, + name: "no updates", + }, + { + existing: &event.TaskExecutionMetadata{ + GeneratedName: "foo", + ResourcePoolInfo: []*event.ResourcePoolInfo{}, + PluginIdentifier: "bar", + InstanceClass: 0, + }, + latest: &event.TaskExecutionMetadata{ + GeneratedName: "bar", + ResourcePoolInfo: []*event.ResourcePoolInfo{}, + PluginIdentifier: "foo", + InstanceClass: 1, + }, + expected: &event.TaskExecutionMetadata{ + GeneratedName: "bar", + ResourcePoolInfo: []*event.ResourcePoolInfo{}, + PluginIdentifier: "foo", + InstanceClass: 1, + }, + name: "all updates", + }, + } + + for _, mergeTestCase := range testCases { + t.Run(mergeTestCase.name, func(t *testing.T) { + metadata := mergeMetadata(mergeTestCase.existing, mergeTestCase.latest) + assert.True(t, proto.Equal(mergeTestCase.expected, metadata)) + }) + } +}