From d9d4873d0ab27aed348b3d4737117705500d33c2 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 3 Jan 2022 17:56:22 -0600 Subject: [PATCH 01/11] replaced Metadata proto in TaskInfo with ExternalResource array Signed-off-by: Daniel Rammer --- go/tasks/pluginmachinery/core/phase.go | 13 ++++++++++-- go/tasks/plugins/array/core/state.go | 27 ++++++++++++++++++------ go/tasks/plugins/hive/execution_state.go | 16 ++++++-------- go/tasks/plugins/k8s/sagemaker/utils.go | 9 +++----- go/tasks/plugins/webapi/athena/plugin.go | 10 +++------ 5 files changed, 44 insertions(+), 31 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index fe4aed060..852503127 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + //"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" // TODO hamersaw - remove "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" structpb "github.com/golang/protobuf/ptypes/struct" @@ -69,6 +69,13 @@ func (p Phase) IsWaitingForResources() bool { return p == PhaseWaitingForResources } +type ExternalResource struct { + // TODO hamersaw - document + ExternalID string + RetryAttempt int + Phase Phase +} + type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog @@ -78,7 +85,9 @@ type TaskInfo struct { // Custom Event information that the plugin would like to expose to the front-end CustomInfo *structpb.Struct // Metadata around how a task was executed - Metadata *event.TaskExecutionMetadata + //Metadata *event.TaskExecutionMetadata // TODO hamersaw - remove + // A collection of information about external resources launched by this task + ExternalResources []*ExternalResource } func (t *TaskInfo) String() string { diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 2d82eb180..bbb19c672 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + //"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" // TODO hamersaw - remove "github.com/flyteorg/flytestdlib/errors" @@ -174,16 +174,31 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl phaseInfo := core.PhaseInfoUndefined t := time.Now() - nowTaskInfo := &core.TaskInfo{ - OccurredAt: &t, - Logs: logLinks, + // TODO hamersaw - remove + /*nowTaskInfo := &core.TaskInfo{ + OccurredAt: &t, + Logs: logLinks, } if nowTaskInfo.Metadata == nil { nowTaskInfo.Metadata = &event.TaskExecutionMetadata{} } - for _, subTaskID := range subTaskIDs { + for childIdx, subTaskID := range subTaskIDs { nowTaskInfo.Metadata.ExternalResources = append(nowTaskInfo.Metadata.ExternalResources, &event.ExternalResourceInfo{ - ExternalId: *subTaskID, + ExternalId: *subTaskID, + }) + }*/ + + nowTaskInfo := &core.TaskInfo{ + OccurredAt: &t, + Logs: logLinks, + ExternalResources: make([]*core.ExternalResource, len(subTaskIDs)), + } + + for childIdx, subTaskID := range subTaskIDs { + nowTaskInfo.ExternalResources = append(nowTaskInfo.ExternalResources, &core.ExternalResource{ + ExternalID: *subTaskID, + RetryAttempt: 0, // TODO hamersaw - set retry attempt + Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], }) } diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index cbc45cc06..ed6f17cfb 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -6,8 +6,6 @@ import ( "strconv" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" @@ -149,20 +147,18 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { logs := make([]*idlCore.TaskLog, 0, 1) t := time.Now() - metadata := &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: e.CommandID, - }, + externalResources := []*core.ExternalResource{ + { + ExternalID: e.CommandID, }, } if e.CommandID != "" { logs = append(logs, ConstructTaskLog(e)) return &core.TaskInfo{ - Logs: logs, - OccurredAt: &t, - Metadata: metadata, + Logs: logs, + OccurredAt: &t, + ExternalResources: externalResources, } } diff --git a/go/tasks/plugins/k8s/sagemaker/utils.go b/go/tasks/plugins/k8s/sagemaker/utils.go index 7dc26ac8b..de1354ad9 100644 --- a/go/tasks/plugins/k8s/sagemaker/utils.go +++ b/go/tasks/plugins/k8s/sagemaker/utils.go @@ -6,7 +6,6 @@ import ( "sort" "strings" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" pluginErrors "github.com/flyteorg/flyteplugins/go/tasks/errors" @@ -400,11 +399,9 @@ func createTaskInfo(_ context.Context, jobRegion string, jobName string, jobType return &pluginsCore.TaskInfo{ Logs: taskLogs, CustomInfo: customInfo, - Metadata: &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: jobName, - }, + ExternalResources: []*pluginsCore.ExternalResource{ + { + ExternalID: jobName, }, }, }, nil diff --git a/go/tasks/plugins/webapi/athena/plugin.go b/go/tasks/plugins/webapi/athena/plugin.go index f2cbba436..c39d9104a 100644 --- a/go/tasks/plugins/webapi/athena/plugin.go +++ b/go/tasks/plugins/webapi/athena/plugin.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - errors2 "github.com/flyteorg/flyteplugins/go/tasks/errors" awsSdk "github.com/aws/aws-sdk-go-v2/aws" @@ -186,11 +184,9 @@ func createTaskInfo(queryID string, cfg awsSdk.Config) *core.TaskInfo { Name: "Athena Query Console", }, }, - Metadata: &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: queryID, - }, + ExternalResources: []*core.ExternalResource{ + { + ExternalID: queryID, }, }, } From ff25c1d56152d37ce516a8fcdd45a23309b9e48a Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 4 Jan 2022 09:48:01 -0600 Subject: [PATCH 02/11] added ExternalResource documentation comments Signed-off-by: Daniel Rammer --- go.mod | 2 ++ go/tasks/pluginmachinery/core/phase.go | 12 +++++------- go/tasks/plugins/array/core/state.go | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 9f4a81be7..d1001c6c6 100644 --- a/go.mod +++ b/go.mod @@ -50,3 +50,5 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d + +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 852503127..50c1ee8d3 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -4,8 +4,6 @@ import ( "fmt" "time" - //"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" // TODO hamersaw - remove - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" structpb "github.com/golang/protobuf/ptypes/struct" ) @@ -70,10 +68,12 @@ func (p Phase) IsWaitingForResources() bool { } type ExternalResource struct { - // TODO hamersaw - document - ExternalID string + // A unique identifier for the external resource + ExternalID string + // The nubmer of times this external resource has been attempted RetryAttempt int - Phase Phase + // Phase (if exists) associated with the external resource + Phase Phase } type TaskInfo struct { @@ -84,8 +84,6 @@ type TaskInfo struct { OccurredAt *time.Time // Custom Event information that the plugin would like to expose to the front-end CustomInfo *structpb.Struct - // Metadata around how a task was executed - //Metadata *event.TaskExecutionMetadata // TODO hamersaw - remove // A collection of information about external resources launched by this task ExternalResources []*ExternalResource } diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index bbb19c672..93bd29675 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -195,11 +195,11 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl } for childIdx, subTaskID := range subTaskIDs { - nowTaskInfo.ExternalResources = append(nowTaskInfo.ExternalResources, &core.ExternalResource{ + nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ ExternalID: *subTaskID, RetryAttempt: 0, // TODO hamersaw - set retry attempt Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], - }) + } } switch p, version := state.GetPhase(); p { From 24edb17ac86bcb85de6c68d443c4ed8f8847a371 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 4 Jan 2022 11:27:47 -0600 Subject: [PATCH 03/11] setting retry attempt on external resources Signed-off-by: Daniel Rammer --- go.sum | 2 ++ go/tasks/pluginmachinery/core/phase.go | 2 +- go/tasks/plugins/array/awsbatch/executor.go | 2 +- go/tasks/plugins/array/core/state.go | 26 ++++++--------------- go/tasks/plugins/array/k8s/executor.go | 2 +- 5 files changed, 12 insertions(+), 22 deletions(-) diff --git a/go.sum b/go.sum index 280ffeb9c..b97918ae3 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90= github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb h1:l3/2D23ruD0yw4O1HAeZU0NICDP7//W+XebRHIWlU1A= +github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.3.36 h1:XLvc7kfc9XkQBpPvNXevh5+Ijbgmd7gEOHTWhdEY5eA= github.com/flyteorg/flytestdlib v0.3.36/go.mod h1:7cDWkY3v7xsoesFcDdu6DSW5Q2U2W5KlHUbUHSwBG1Q= diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 50c1ee8d3..22628d09d 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -71,7 +71,7 @@ type ExternalResource struct { // A unique identifier for the external resource ExternalID string // The nubmer of times this external resource has been attempted - RetryAttempt int + RetryAttempt uint32 // Phase (if exists) associated with the external resource Phase Phase } diff --git a/go/tasks/plugins/array/awsbatch/executor.go b/go/tasks/plugins/array/awsbatch/executor.go index 104149f29..fa8fa7e4d 100644 --- a/go/tasks/plugins/array/awsbatch/executor.go +++ b/go/tasks/plugins/array/awsbatch/executor.go @@ -112,7 +112,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c logger.Infof(ctx, "Exiting handle with phase [%v]", pluginState.State.CurrentPhase) // Determine transition information from the state - phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs) + phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) if err != nil { return core.UnknownTransition, err } diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 93bd29675..403da9c80 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - //"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" // TODO hamersaw - remove - "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus" @@ -170,23 +168,9 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 { // Info fields will always be nil, because we're going to send log links individually. This simplifies our state // handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping // all the log links takes up a lot of space). -func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) { - +func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string, retryAttempt uint32) (core.PhaseInfo, error) { phaseInfo := core.PhaseInfoUndefined t := time.Now() - // TODO hamersaw - remove - /*nowTaskInfo := &core.TaskInfo{ - OccurredAt: &t, - Logs: logLinks, - } - if nowTaskInfo.Metadata == nil { - nowTaskInfo.Metadata = &event.TaskExecutionMetadata{} - } - for childIdx, subTaskID := range subTaskIDs { - nowTaskInfo.Metadata.ExternalResources = append(nowTaskInfo.Metadata.ExternalResources, &event.ExternalResourceInfo{ - ExternalId: *subTaskID, - }) - }*/ nowTaskInfo := &core.TaskInfo{ OccurredAt: &t, @@ -196,8 +180,12 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl for childIdx, subTaskID := range subTaskIDs { nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ - ExternalID: *subTaskID, - RetryAttempt: 0, // TODO hamersaw - set retry attempt + ExternalID: *subTaskID, + // Currently a failure within any map subtask triggers re-execution of all subtasks, + // therefore the RetryAttempt is identical for all external resources. Tracking retries + // over individual subtasks may be implemented as an additional ArrayStatus data + // structure, this should be updated accordingly. + RetryAttempt: retryAttempt, Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], } } diff --git a/go/tasks/plugins/array/k8s/executor.go b/go/tasks/plugins/array/k8s/executor.go index 7e8bc6353..06e6b71a2 100644 --- a/go/tasks/plugins/array/k8s/executor.go +++ b/go/tasks/plugins/array/k8s/executor.go @@ -136,7 +136,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c } // Determine transition information from the state - phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks, subTaskIDs) + phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks, subTaskIDs, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) if err != nil { return core.UnknownTransition, err } From de5b4773187ced71b133e822bccc3fc3c8a85f16 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 4 Jan 2022 14:21:49 -0600 Subject: [PATCH 04/11] fixed unit tests and lint issues Signed-off-by: Daniel Rammer --- .../pluginmachinery/webapi/example/plugin.go | 10 +-- go/tasks/plugins/array/core/state.go | 2 +- go/tasks/plugins/array/core/state_test.go | 79 +++++++++++++------ go/tasks/plugins/hive/execution_state_test.go | 12 +-- .../k8s/sagemaker/builtin_training_test.go | 12 +-- .../k8s/sagemaker/custom_training_test.go | 12 +-- .../sagemaker/hyperparameter_tuning_test.go | 12 +-- go/tasks/plugins/presto/execution_state.go | 10 +-- .../plugins/presto/execution_state_test.go | 4 +- go/tasks/plugins/webapi/athena/plugin_test.go | 11 +-- 10 files changed, 72 insertions(+), 92 deletions(-) diff --git a/go/tasks/pluginmachinery/webapi/example/plugin.go b/go/tasks/pluginmachinery/webapi/example/plugin.go index 401cb9064..2c300e92f 100644 --- a/go/tasks/pluginmachinery/webapi/example/plugin.go +++ b/go/tasks/pluginmachinery/webapi/example/plugin.go @@ -4,8 +4,6 @@ import ( "context" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flytestdlib/errors" @@ -96,11 +94,9 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co }, }, OccurredAt: &tNow, - Metadata: &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "abc", - }, + ExternalResources: []*core.ExternalResource{ + { + ExternalID: "abc", }, }, }), nil diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 403da9c80..408f7efd9 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -183,7 +183,7 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl ExternalID: *subTaskID, // Currently a failure within any map subtask triggers re-execution of all subtasks, // therefore the RetryAttempt is identical for all external resources. Tracking retries - // over individual subtasks may be implemented as an additional ArrayStatus data + // over individual subtasks may be implemented as an additional ArrayStatus data // structure, this should be updated accordingly. RetryAttempt: retryAttempt, Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 876612623..321586ee4 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -5,14 +5,13 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/plugins" "github.com/golang/protobuf/proto" "github.com/flyteorg/flytestdlib/bitarray" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus" "github.com/stretchr/testify/assert" ) @@ -51,32 +50,39 @@ func assertBitSetsEqual(t testing.TB, b1, b2 *bitarray.BitSet, len int) { } } -func assertTaskExecutionMetadata(t *testing.T, subTaskIDs []*string, metadata *event.TaskExecutionMetadata) { - assert.NotNil(t, metadata) - var externalResources = make([]*event.ExternalResourceInfo, len(subTaskIDs)) +func assertTaskExternalResources(t *testing.T, subTaskIDs []*string, detailedArray *bitarray.CompactArray, retryAttempt uint32, externalResources []*core.ExternalResource) { + assert.NotNil(t, externalResources) for i, subTaskID := range subTaskIDs { - externalResources[i] = &event.ExternalResourceInfo{ - ExternalId: *subTaskID, - } + externalResource := externalResources[i] + assert.Equal(t, *subTaskID, externalResource.ExternalID) + assert.Equal(t, retryAttempt, externalResource.RetryAttempt) + assert.Equal(t, core.Phases[detailedArray.GetItem(i)], externalResource.Phase) } - assert.True(t, proto.Equal(&event.TaskExecutionMetadata{ - ExternalResources: externalResources, - }, metadata)) } func TestMapArrayStateToPluginPhase(t *testing.T) { ctx := context.Background() - var subTaskIDs = make([]*string, 3) - for i := 0; i < 3; i++ { + + subTaskCount := 3 + retryAttempt := uint32(1) + + var subTaskIDs = make([]*string, subTaskCount) + detailedArray := NewPhasesCompactArray(uint(subTaskCount)) + for i := 0; i < subTaskCount; i++ { subTaskID := fmt.Sprintf("sub_task_%d", i) subTaskIDs[i] = &subTaskID + + detailedArray.SetItem(i, bitarray.Item(core.PhaseRunning)) } t.Run("start", func(t *testing.T) { s := State{ CurrentPhase: PhaseStart, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseInitializing, phaseInfo.Phase()) }) @@ -85,9 +91,12 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { s := State{ CurrentPhase: PhaseLaunch, PhaseVersion: 0, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) }) @@ -98,13 +107,16 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { PhaseVersion: 8, OriginalArraySize: 10, ExecutionArraySize: 5, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) assert.Equal(t, uint32(368), phaseInfo.Version()) - assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata) + assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) }) t.Run("write to discovery", func(t *testing.T) { @@ -113,58 +125,73 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { PhaseVersion: 8, OriginalArraySize: 10, ExecutionArraySize: 5, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) assert.Equal(t, uint32(548), phaseInfo.Version()) - assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata) + assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) }) t.Run("success", func(t *testing.T) { s := State{ CurrentPhase: PhaseSuccess, PhaseVersion: 0, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseSuccess, phaseInfo.Phase()) - assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata) + assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) }) t.Run("retryable failure", func(t *testing.T) { s := State{ CurrentPhase: PhaseRetryableFailure, PhaseVersion: 0, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhaseRetryableFailure, phaseInfo.Phase()) - assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata) + assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) }) t.Run("permanent failure", func(t *testing.T) { s := State{ CurrentPhase: PhasePermanentFailure, PhaseVersion: 0, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.Equal(t, core.PhasePermanentFailure, phaseInfo.Phase()) - assertTaskExecutionMetadata(t, subTaskIDs, phaseInfo.Info().Metadata) + assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) }) t.Run("All phases", func(t *testing.T) { for _, p := range PhaseValues() { s := State{ CurrentPhase: p, + ArrayStatus: arraystatus.ArrayStatus{ + Detailed: detailedArray, + }, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) assert.NoError(t, err) assert.NotEqual(t, core.PhaseUndefined, phaseInfo.Phase()) } diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index cd5cd868e..749e23b46 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -7,9 +7,6 @@ import ( "testing" "time" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ioMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" @@ -128,13 +125,8 @@ func TestConstructTaskInfo(t *testing.T) { taskInfo := ConstructTaskInfo(e) assert.Equal(t, "https://wellness.qubole.com/v2/analyze?command_id=123", taskInfo.Logs[0].Uri) - assert.True(t, proto.Equal(taskInfo.Metadata, &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "123", - }, - }, - })) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "123") } func TestMapExecutionStateToPhaseInfo(t *testing.T) { diff --git a/go/tasks/plugins/k8s/sagemaker/builtin_training_test.go b/go/tasks/plugins/k8s/sagemaker/builtin_training_test.go index df02c97a3..1634ff7b7 100644 --- a/go/tasks/plugins/k8s/sagemaker/builtin_training_test.go +++ b/go/tasks/plugins/k8s/sagemaker/builtin_training_test.go @@ -5,9 +5,6 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" - "github.com/go-test/deep" flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -273,12 +270,7 @@ func Test_awsSagemakerPlugin_getEventInfoForTrainingJob(t *testing.T) { if diff := deep.Equal(expectedCustomInfo, taskInfo.CustomInfo); diff != nil { assert.FailNow(t, "Should be equal.", "Diff: %v", diff) } - assert.True(t, proto.Equal(taskInfo.Metadata, &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "some-acceptable-name", - }, - }, - })) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "some-acceptable-name") }) } diff --git a/go/tasks/plugins/k8s/sagemaker/custom_training_test.go b/go/tasks/plugins/k8s/sagemaker/custom_training_test.go index 91709b507..748fa7d35 100644 --- a/go/tasks/plugins/k8s/sagemaker/custom_training_test.go +++ b/go/tasks/plugins/k8s/sagemaker/custom_training_test.go @@ -6,9 +6,6 @@ import ( "strconv" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" - "github.com/go-test/deep" flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -297,12 +294,7 @@ func Test_awsSagemakerPlugin_getEventInfoForCustomTrainingJob(t *testing.T) { if diff := deep.Equal(expectedCustomInfo, taskInfo.CustomInfo); diff != nil { assert.FailNow(t, "Should be equal.", "Diff: %v", diff) } - assert.True(t, proto.Equal(taskInfo.Metadata, &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "some-acceptable-name", - }, - }, - })) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "some-acceptable-name") }) } diff --git a/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning_test.go b/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning_test.go index 796949d86..e710dc564 100644 --- a/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning_test.go +++ b/go/tasks/plugins/k8s/sagemaker/hyperparameter_tuning_test.go @@ -5,9 +5,6 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" - "github.com/go-test/deep" flyteIdlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -129,12 +126,7 @@ func Test_awsSagemakerPlugin_getEventInfoForHyperparameterTuningJob(t *testing.T if diff := deep.Equal(expectedCustomInfo, taskInfo.CustomInfo); diff != nil { assert.FailNow(t, "Should be equal.", "Diff: %v", diff) } - assert.True(t, proto.Equal(taskInfo.Metadata, &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "some-acceptable-name", - }, - }, - })) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "some-acceptable-name") }) } diff --git a/go/tasks/plugins/presto/execution_state.go b/go/tasks/plugins/presto/execution_state.go index 3370b0b95..7d241cb09 100644 --- a/go/tasks/plugins/presto/execution_state.go +++ b/go/tasks/plugins/presto/execution_state.go @@ -3,8 +3,6 @@ package presto import ( "context" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" @@ -506,11 +504,9 @@ func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { return &core.TaskInfo{ Logs: logs, OccurredAt: &t, - Metadata: &event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: e.CommandID, - }, + ExternalResources: []*core.ExternalResource{ + { + ExternalID: e.CommandID, }, }, } diff --git a/go/tasks/plugins/presto/execution_state_test.go b/go/tasks/plugins/presto/execution_state_test.go index d6caad0b8..9a2c4c15b 100644 --- a/go/tasks/plugins/presto/execution_state_test.go +++ b/go/tasks/plugins/presto/execution_state_test.go @@ -106,8 +106,8 @@ func TestConstructTaskInfo(t *testing.T) { taskInfo := ConstructTaskInfo(e) assert.Equal(t, "https://prestoproxy-internal.flyteorg.net:443", taskInfo.Logs[0].Uri) - assert.Len(t, taskInfo.Metadata.ExternalResources, 1) - assert.Equal(t, taskInfo.Metadata.ExternalResources[0].ExternalId, "123") + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "123") } func TestMapExecutionStateToPhaseInfo(t *testing.T) { diff --git a/go/tasks/plugins/webapi/athena/plugin_test.go b/go/tasks/plugins/webapi/athena/plugin_test.go index d85b42573..5f821bb67 100644 --- a/go/tasks/plugins/webapi/athena/plugin_test.go +++ b/go/tasks/plugins/webapi/athena/plugin_test.go @@ -5,8 +5,6 @@ import ( awsSdk "github.com/aws/aws-sdk-go-v2/aws" idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" ) @@ -20,11 +18,6 @@ func TestCreateTaskInfo(t *testing.T) { Name: "Athena Query Console", }, }, taskInfo.Logs) - assert.True(t, proto.Equal(&event.TaskExecutionMetadata{ - ExternalResources: []*event.ExternalResourceInfo{ - { - ExternalId: "query_id", - }, - }, - }, taskInfo.Metadata)) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "query_id") } From 478f06513f60f59ee79807e5c4ccad0096d9a8c9 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 6 Jan 2022 19:00:18 -0600 Subject: [PATCH 05/11] tracking RetryAttempt for k8s array plugin using a CompactArray Signed-off-by: Daniel Rammer --- go/tasks/plugins/array/awsbatch/executor.go | 2 +- go/tasks/plugins/array/core/state.go | 16 +++++++++------- go/tasks/plugins/array/k8s/executor.go | 2 +- go/tasks/plugins/array/k8s/monitor.go | 20 ++++++++++++++++++++ 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/executor.go b/go/tasks/plugins/array/awsbatch/executor.go index fa8fa7e4d..104149f29 100644 --- a/go/tasks/plugins/array/awsbatch/executor.go +++ b/go/tasks/plugins/array/awsbatch/executor.go @@ -112,7 +112,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c logger.Infof(ctx, "Exiting handle with phase [%v]", pluginState.State.CurrentPhase) // Determine transition information from the state - phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) + phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, pluginState.State, subTaskDetails.LogLinks, subTaskDetails.SubTaskIDs) if err != nil { return core.UnknownTransition, err } diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 50169c6e3..afa70ba41 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -50,6 +50,9 @@ type State struct { // Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size) IndexesToCache *bitarray.BitSet `json:"indexesToCache"` + + // TODO hamersaw - document + RetryAttempts bitarray.CompactArray `json:"retryAttempts"` } func (s State) GetReason() string { @@ -168,7 +171,7 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 { // Info fields will always be nil, because we're going to send log links individually. This simplifies our state // handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping // all the log links takes up a lot of space). -func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string, retryAttempt uint32) (core.PhaseInfo, error) { +func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) { phaseInfo := core.PhaseInfoUndefined t := time.Now() @@ -179,13 +182,12 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl } for childIdx, subTaskID := range subTaskIDs { + originalIdx := CalculateOriginalIndex(childIdx, state.GetIndexesToCache()) + nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ - ExternalID: *subTaskID, - // Currently a failure within any map subtask triggers re-execution of all subtasks, - // therefore the RetryAttempt is identical for all external resources. Tracking retries - // over individual subtasks may be implemented as an additional ArrayStatus data - // structure, this should be updated accordingly. - RetryAttempt: retryAttempt, + ExternalID: *subTaskID, + // TODO hamersaw - need to set RetryAttempts on awsbatch state + RetryAttempt: uint32(state.RetryAttempts.GetItem(originalIdx)), Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], } } diff --git a/go/tasks/plugins/array/k8s/executor.go b/go/tasks/plugins/array/k8s/executor.go index 06e6b71a2..7e8bc6353 100644 --- a/go/tasks/plugins/array/k8s/executor.go +++ b/go/tasks/plugins/array/k8s/executor.go @@ -136,7 +136,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c } // Determine transition information from the state - phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks, subTaskIDs, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) + phaseInfo, err := arrayCore.MapArrayStateToPluginPhase(ctx, nextState, logLinks, subTaskIDs) if err != nil { return core.UnknownTransition, err } diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index cea135181..3899c9a8e 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -61,6 +61,26 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon currentState.ArrayStatus = *newArrayStatus } + // TODO hamersaw - initialize attempts? maxRetries? + if len(currentState.RetryAttempts.GetItems()) == 0 { + count := uint(currentState.GetOriginalArraySize()) + maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetMaxAttempts()) + + retryAttemptsArray, err := bitarray.NewCompactArray(count, maxValue) + if err != nil { + logger.Errorf(context.Background(), "Failed to create attempts compact array with [count: %v, maxValue: %v]", count, maxValue) + return currentState, logLinks, subTaskIDs, nil + } + + retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) + for i := 0; i Date: Thu, 6 Jan 2022 19:10:08 -0600 Subject: [PATCH 06/11] added a few comments Signed-off-by: Daniel Rammer --- go/tasks/plugins/array/core/state.go | 6 ++---- go/tasks/plugins/array/k8s/monitor.go | 9 +++++++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index afa70ba41..46094eecc 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -51,7 +51,7 @@ type State struct { // Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size) IndexesToCache *bitarray.BitSet `json:"indexesToCache"` - // TODO hamersaw - document + // Tracks the number of subtask retries using the execution index RetryAttempts bitarray.CompactArray `json:"retryAttempts"` } @@ -182,12 +182,10 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl } for childIdx, subTaskID := range subTaskIDs { - originalIdx := CalculateOriginalIndex(childIdx, state.GetIndexesToCache()) - nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ ExternalID: *subTaskID, // TODO hamersaw - need to set RetryAttempts on awsbatch state - RetryAttempt: uint32(state.RetryAttempts.GetItem(originalIdx)), + RetryAttempt: uint32(state.RetryAttempts.GetItem(childIdx)), Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], } } diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index 3899c9a8e..581d64631 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -61,9 +61,10 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon currentState.ArrayStatus = *newArrayStatus } - // TODO hamersaw - initialize attempts? maxRetries? + // If the current State is a newly minted then we must initialize RetryAttempts to track how + // many times each subtask is executed. if len(currentState.RetryAttempts.GetItems()) == 0 { - count := uint(currentState.GetOriginalArraySize()) + count := uint(currentState.GetExecutionArraySize()) maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetMaxAttempts()) retryAttemptsArray, err := bitarray.NewCompactArray(count, maxValue) @@ -72,6 +73,10 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon return currentState, logLinks, subTaskIDs, nil } + // Currently if any subtask fails then all subtasks are retried up to MaxAttempts. Therefore, we + // all subtasks have an identical RetryAttempt, namely that of the map task execution metadata. + // Once retries over individual subtasks is implemented we should revisit this logic and instead + // increment the RetryAttempt on each subtask everytime a new pod is created. retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) for i := 0; i Date: Thu, 6 Jan 2022 19:28:04 -0600 Subject: [PATCH 07/11] setting RetryAttempts for awsbatch subtasks Signed-off-by: Daniel Rammer --- go/tasks/plugins/array/awsbatch/launcher.go | 11 ++++++++++- go/tasks/plugins/array/awsbatch/monitor.go | 1 + go/tasks/plugins/array/core/state.go | 6 +++++- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/launcher.go b/go/tasks/plugins/array/awsbatch/launcher.go index 32f2c5d1d..8959af267 100644 --- a/go/tasks/plugins/array/awsbatch/launcher.go +++ b/go/tasks/plugins/array/awsbatch/launcher.go @@ -6,6 +6,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/errors" + "github.com/flyteorg/flytestdlib/bitarray" "github.com/flyteorg/flytestdlib/logger" arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core" @@ -53,6 +54,13 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl } metrics.SubTasksSubmitted.Add(ctx, float64(size)) + + retryAttemptsArray, err := bitarray.NewCompactArray(uint(size), bitarray.Item(pluginConfig.MaxRetries)) + if err != nil { + logger.Errorf(context.Background(), "Failed to create attempts compact array with [count: %v, maxValue: %v]", size, pluginConfig.MaxRetries) + return nil, err + } + parentState := currentState. SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0). SetArrayStatus(arraystatus.ArrayStatus{ @@ -61,7 +69,8 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl }, Detailed: arrayCore.NewPhasesCompactArray(uint(size)), }). - SetReason("Successfully launched subtasks.") + SetReason("Successfully launched subtasks."). + SetRetryAttempts(retryAttemptsArray) nextState = currentState.SetExternalJobID(j) nextState.State = parentState diff --git a/go/tasks/plugins/array/awsbatch/monitor.go b/go/tasks/plugins/array/awsbatch/monitor.go index 87ef4ade7..a7d033aa1 100644 --- a/go/tasks/plugins/array/awsbatch/monitor.go +++ b/go/tasks/plugins/array/awsbatch/monitor.go @@ -112,6 +112,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(actualPhase)) newArrayStatus.Summary.Inc(actualPhase) + parentState.RetryAttempts.SetItem(childIdx, bitarray.Item(len(subJob.Attempts))) } if queued > 0 { diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 46094eecc..674cea9e3 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -112,6 +112,11 @@ func (s *State) SetReason(reason string) *State { return s } +func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State { + s.RetryAttempts = retryAttempts + return s +} + func (s *State) SetExecutionArraySize(size int) *State { s.ExecutionArraySize = size return s @@ -184,7 +189,6 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl for childIdx, subTaskID := range subTaskIDs { nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ ExternalID: *subTaskID, - // TODO hamersaw - need to set RetryAttempts on awsbatch state RetryAttempt: uint32(state.RetryAttempts.GetItem(childIdx)), Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], } From 3c163205ec7fa62dee126b55ae3cd94d4773446d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 6 Jan 2022 20:13:30 -0600 Subject: [PATCH 08/11] fixed unit tests and lint issues Signed-off-by: Daniel Rammer --- .../plugins/array/awsbatch/launcher_test.go | 5 +++ .../plugins/array/awsbatch/monitor_test.go | 8 ++++ go/tasks/plugins/array/core/state_test.go | 43 ++++++++++++------- go/tasks/plugins/array/k8s/monitor.go | 2 +- go/tasks/plugins/array/k8s/monitor_test.go | 13 ++++++ tests/end_to_end.go | 1 + 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/go/tasks/plugins/array/awsbatch/launcher_test.go b/go/tasks/plugins/array/awsbatch/launcher_test.go index 5520a9b3d..d135500b1 100644 --- a/go/tasks/plugins/array/awsbatch/launcher_test.go +++ b/go/tasks/plugins/array/awsbatch/launcher_test.go @@ -3,6 +3,7 @@ package awsbatch import ( "testing" + "github.com/flyteorg/flytestdlib/bitarray" "github.com/flyteorg/flytestdlib/promutils" "github.com/stretchr/testify/mock" @@ -110,6 +111,9 @@ func TestLaunchSubTasks(t *testing.T) { JobDefinitionArn: "arn", } + retryAttemptsArray, err := bitarray.NewCompactArray(5, bitarray.Item(0)) + assert.NoError(t, err) + expectedState := &State{ State: &core2.State{ CurrentPhase: core2.PhaseCheckingSubTaskExecutions, @@ -123,6 +127,7 @@ func TestLaunchSubTasks(t *testing.T) { }, Detailed: arrayCore.NewPhasesCompactArray(5), }, + RetryAttempts: retryAttemptsArray, }, ExternalJobID: refStr("qpxyarq"), diff --git a/go/tasks/plugins/array/awsbatch/monitor_test.go b/go/tasks/plugins/array/awsbatch/monitor_test.go index b54af4465..621512b4a 100644 --- a/go/tasks/plugins/array/awsbatch/monitor_test.go +++ b/go/tasks/plugins/array/awsbatch/monitor_test.go @@ -136,6 +136,9 @@ func TestCheckSubTasksState(t *testing.T) { inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) + retryAttemptsArray, err := bitarray.NewCompactArray(1, bitarray.Item(1)) + assert.NoError(t, err) + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{ State: &arrayCore.State{ CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, @@ -146,6 +149,7 @@ func TestCheckSubTasksState(t *testing.T) { Detailed: arrayCore.NewPhasesCompactArray(1), }, IndexesToCache: bitarray.NewBitSet(1), + RetryAttempts: retryAttemptsArray, }, ExternalJobID: refStr("job-id"), JobDefinitionArn: "", @@ -180,6 +184,9 @@ func TestCheckSubTasksState(t *testing.T) { inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) assert.NoError(t, err) + retryAttemptsArray, err := bitarray.NewCompactArray(2, bitarray.Item(1)) + assert.NoError(t, err) + newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{ State: &arrayCore.State{ CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions, @@ -190,6 +197,7 @@ func TestCheckSubTasksState(t *testing.T) { Detailed: arrayCore.NewPhasesCompactArray(2), }, IndexesToCache: bitarray.NewBitSet(2), + RetryAttempts: retryAttemptsArray, }, ExternalJobID: refStr("job-id"), JobDefinitionArn: "", diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 321586ee4..3aef2acee 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -50,12 +50,12 @@ func assertBitSetsEqual(t testing.TB, b1, b2 *bitarray.BitSet, len int) { } } -func assertTaskExternalResources(t *testing.T, subTaskIDs []*string, detailedArray *bitarray.CompactArray, retryAttempt uint32, externalResources []*core.ExternalResource) { +func assertTaskExternalResources(t *testing.T, subTaskIDs []*string, retryAttemptsArray *bitarray.CompactArray, detailedArray *bitarray.CompactArray, externalResources []*core.ExternalResource) { assert.NotNil(t, externalResources) for i, subTaskID := range subTaskIDs { externalResource := externalResources[i] assert.Equal(t, *subTaskID, externalResource.ExternalID) - assert.Equal(t, retryAttempt, externalResource.RetryAttempt) + assert.Equal(t, retryAttemptsArray.GetItem(i), bitarray.Item(externalResource.RetryAttempt)) assert.Equal(t, core.Phases[detailedArray.GetItem(i)], externalResource.Phase) } } @@ -64,15 +64,18 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ctx := context.Background() subTaskCount := 3 - retryAttempt := uint32(1) var subTaskIDs = make([]*string, subTaskCount) detailedArray := NewPhasesCompactArray(uint(subTaskCount)) + retryAttemptsArray, err := bitarray.NewCompactArray(uint(subTaskCount), bitarray.Item(1)) + assert.NoError(t, err) + for i := 0; i < subTaskCount; i++ { subTaskID := fmt.Sprintf("sub_task_%d", i) subTaskIDs[i] = &subTaskID detailedArray.SetItem(i, bitarray.Item(core.PhaseRunning)) + retryAttemptsArray.SetItem(i, bitarray.Item(1)) } t.Run("start", func(t *testing.T) { @@ -81,8 +84,9 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseInitializing, phaseInfo.Phase()) }) @@ -94,9 +98,10 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) }) @@ -110,13 +115,14 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) assert.Equal(t, uint32(368), phaseInfo.Version()) - assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) + assertTaskExternalResources(t, subTaskIDs, &retryAttemptsArray, &detailedArray, phaseInfo.Info().ExternalResources) }) t.Run("write to discovery", func(t *testing.T) { @@ -128,13 +134,14 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseRunning, phaseInfo.Phase()) assert.Equal(t, uint32(548), phaseInfo.Version()) - assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) + assertTaskExternalResources(t, subTaskIDs, &retryAttemptsArray, &detailedArray, phaseInfo.Info().ExternalResources) }) t.Run("success", func(t *testing.T) { @@ -144,12 +151,13 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseSuccess, phaseInfo.Phase()) - assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) + assertTaskExternalResources(t, subTaskIDs, &retryAttemptsArray, &detailedArray, phaseInfo.Info().ExternalResources) }) t.Run("retryable failure", func(t *testing.T) { @@ -159,12 +167,13 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhaseRetryableFailure, phaseInfo.Phase()) - assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) + assertTaskExternalResources(t, subTaskIDs, &retryAttemptsArray, &detailedArray, phaseInfo.Info().ExternalResources) }) t.Run("permanent failure", func(t *testing.T) { @@ -174,12 +183,13 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.Equal(t, core.PhasePermanentFailure, phaseInfo.Phase()) - assertTaskExternalResources(t, subTaskIDs, &detailedArray, retryAttempt, phaseInfo.Info().ExternalResources) + assertTaskExternalResources(t, subTaskIDs, &retryAttemptsArray, &detailedArray, phaseInfo.Info().ExternalResources) }) t.Run("All phases", func(t *testing.T) { @@ -189,9 +199,10 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, + RetryAttempts: retryAttemptsArray, } - phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs, retryAttempt) + phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) assert.NotEqual(t, core.PhaseUndefined, phaseInfo.Phase()) } diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index 581d64631..6e0989b2c 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -78,7 +78,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon // Once retries over individual subtasks is implemented we should revisit this logic and instead // increment the RetryAttempt on each subtask everytime a new pod is created. retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) - for i := 0; i Date: Fri, 7 Jan 2022 05:01:07 -0600 Subject: [PATCH 09/11] populating external resource index with original index Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 2 ++ go/tasks/pluginmachinery/core/phase.go | 3 +++ go/tasks/plugins/array/core/state.go | 11 ++++++---- go/tasks/plugins/array/core/state_test.go | 25 +++++++++++++++-------- go/tasks/plugins/array/k8s/monitor.go | 9 ++++---- 6 files changed, 34 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 7bb0366e3..f657fe526 100644 --- a/go.mod +++ b/go.mod @@ -51,4 +51,4 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb +replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004 diff --git a/go.sum b/go.sum index 76c39cdf6..713bc64e4 100644 --- a/go.sum +++ b/go.sum @@ -237,6 +237,8 @@ github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUB github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb h1:l3/2D23ruD0yw4O1HAeZU0NICDP7//W+XebRHIWlU1A= github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004 h1:LO6e9bXKLoSw+xQ+KX5iQVHSTcG11CFtZ7unHTo3FQU= +github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.4.7 h1:SMPPXI3j/MjP7D2fqaR+lPQkTrqYS7xZbwsgJI2F8SU= github.com/flyteorg/flytestdlib v0.4.7/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs= diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 22628d09d..4e0c15dc5 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -70,6 +70,9 @@ func (p Phase) IsWaitingForResources() bool { type ExternalResource struct { // A unique identifier for the external resource ExternalID string + // A unique index for the external resource. Although the ID may change, this will remain the same + // throughout task event reports and retries. + Index uint32 // The nubmer of times this external resource has been attempted RetryAttempt uint32 // Phase (if exists) associated with the external resource diff --git a/go/tasks/plugins/array/core/state.go b/go/tasks/plugins/array/core/state.go index 674cea9e3..a8908fec0 100644 --- a/go/tasks/plugins/array/core/state.go +++ b/go/tasks/plugins/array/core/state.go @@ -186,11 +186,14 @@ func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idl ExternalResources: make([]*core.ExternalResource, len(subTaskIDs)), } - for childIdx, subTaskID := range subTaskIDs { - nowTaskInfo.ExternalResources[childIdx] = &core.ExternalResource{ + for childIndex, subTaskID := range subTaskIDs { + originalIndex := CalculateOriginalIndex(childIndex, state.GetIndexesToCache()) + + nowTaskInfo.ExternalResources[childIndex] = &core.ExternalResource{ ExternalID: *subTaskID, - RetryAttempt: uint32(state.RetryAttempts.GetItem(childIdx)), - Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIdx)], + Index: uint32(originalIndex), + RetryAttempt: uint32(state.RetryAttempts.GetItem(childIndex)), + Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIndex)], } } diff --git a/go/tasks/plugins/array/core/state_test.go b/go/tasks/plugins/array/core/state_test.go index 3aef2acee..1f477a874 100644 --- a/go/tasks/plugins/array/core/state_test.go +++ b/go/tasks/plugins/array/core/state_test.go @@ -67,6 +67,7 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { var subTaskIDs = make([]*string, subTaskCount) detailedArray := NewPhasesCompactArray(uint(subTaskCount)) + indexesToCache := InvertBitSet(bitarray.NewBitSet(uint(subTaskCount)), uint(subTaskCount)) retryAttemptsArray, err := bitarray.NewCompactArray(uint(subTaskCount), bitarray.Item(1)) assert.NoError(t, err) @@ -84,7 +85,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) assert.NoError(t, err) @@ -98,7 +100,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -115,7 +118,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -134,7 +138,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -151,7 +156,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -167,7 +173,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -183,7 +190,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) @@ -199,7 +207,8 @@ func TestMapArrayStateToPluginPhase(t *testing.T) { ArrayStatus: arraystatus.ArrayStatus{ Detailed: detailedArray, }, - RetryAttempts: retryAttemptsArray, + IndexesToCache: indexesToCache, + RetryAttempts: retryAttemptsArray, } phaseInfo, err := MapArrayStateToPluginPhase(ctx, &s, nil, subTaskIDs) diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index 6e0989b2c..c9d71ad2a 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -73,14 +73,13 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon return currentState, logLinks, subTaskIDs, nil } - // Currently if any subtask fails then all subtasks are retried up to MaxAttempts. Therefore, we - // all subtasks have an identical RetryAttempt, namely that of the map task execution metadata. - // Once retries over individual subtasks is implemented we should revisit this logic and instead + // Currently if any subtask fails then all subtasks are retried up to MaxAttempts. Therefore, all + // subtasks have an identical RetryAttempt, namely that of the map task execution metadata. Once + // retries over individual subtasks is implemented we should revisit this logic and instead // increment the RetryAttempt on each subtask everytime a new pod is created. retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) for i := 0; i < currentState.GetExecutionArraySize(); i++ { - originalIndex := arrayCore.CalculateOriginalIndex(i, currentState.GetIndexesToCache()) - retryAttemptsArray.SetItem(originalIndex, retryAttempt) + retryAttemptsArray.SetItem(i, retryAttempt) } currentState.RetryAttempts = retryAttemptsArray From 8230af133902163edb2f755058fef85c17b9712d Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Fri, 7 Jan 2022 10:43:51 -0600 Subject: [PATCH 10/11] updated comments Signed-off-by: Daniel Rammer --- go/tasks/plugins/array/k8s/monitor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index c9d71ad2a..f7d1bfcfa 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -61,8 +61,8 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon currentState.ArrayStatus = *newArrayStatus } - // If the current State is a newly minted then we must initialize RetryAttempts to track how - // many times each subtask is executed. + // If the current State is newly minted then we must initialize RetryAttempts to track how many + // times each subtask is executed. if len(currentState.RetryAttempts.GetItems()) == 0 { count := uint(currentState.GetExecutionArraySize()) maxValue := bitarray.Item(tCtx.TaskExecutionMetadata().GetMaxAttempts()) @@ -75,8 +75,8 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon // Currently if any subtask fails then all subtasks are retried up to MaxAttempts. Therefore, all // subtasks have an identical RetryAttempt, namely that of the map task execution metadata. Once - // retries over individual subtasks is implemented we should revisit this logic and instead - // increment the RetryAttempt on each subtask everytime a new pod is created. + // retries over individual subtasks are implemented we should revisit this logic and instead + // increment the RetryAttempt for each subtask everytime a new pod is created. retryAttempt := bitarray.Item(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID().RetryAttempt) for i := 0; i < currentState.GetExecutionArraySize(); i++ { retryAttemptsArray.SetItem(i, retryAttempt) From f1a11fda460ff6e757ee29a8ae1b63592139b46e Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 13 Jan 2022 06:40:06 -0600 Subject: [PATCH 11/11] updated flyteidl version Signed-off-by: Daniel Rammer --- go.mod | 4 +--- go.sum | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index f657fe526..b311f614e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.0.0 github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v0.21.11 + github.com/flyteorg/flyteidl v0.21.22 github.com/flyteorg/flytestdlib v0.4.7 github.com/go-logr/zapr v0.4.0 // indirect github.com/go-test/deep v1.0.7 @@ -50,5 +50,3 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d - -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004 diff --git a/go.sum b/go.sum index 713bc64e4..cdb4b56c4 100644 --- a/go.sum +++ b/go.sum @@ -239,6 +239,8 @@ github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb h1:l3/2D23ru github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004 h1:LO6e9bXKLoSw+xQ+KX5iQVHSTcG11CFtZ7unHTo3FQU= github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= +github.com/flyteorg/flyteidl v0.21.22 h1:u83LwahAVgnJ5B3taKX7UI1QGkqLEhtYFz2wXMmBREw= +github.com/flyteorg/flyteidl v0.21.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U= github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220= github.com/flyteorg/flytestdlib v0.4.7 h1:SMPPXI3j/MjP7D2fqaR+lPQkTrqYS7xZbwsgJI2F8SU= github.com/flyteorg/flytestdlib v0.4.7/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=