Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Track RetryAttempt and Phase of ExternalResources #231

Merged
merged 13 commits into from
Jan 24, 2022
Merged
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.0.0
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v0.21.11
github.com/flyteorg/flyteidl v0.21.22
github.com/flyteorg/flytestdlib v0.4.7
github.com/go-logr/zapr v0.4.0 // indirect
github.com/go-test/deep v1.0.7
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v0.21.11 h1:oH9YPoR7scO9GFF/I8D0gCTOB+JP5HRK7b7cLUBRz90=
github.com/flyteorg/flyteidl v0.21.11/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb h1:l3/2D23ruD0yw4O1HAeZU0NICDP7//W+XebRHIWlU1A=
github.com/flyteorg/flyteidl v0.21.14-0.20220104154446-515675516cdb/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004 h1:LO6e9bXKLoSw+xQ+KX5iQVHSTcG11CFtZ7unHTo3FQU=
github.com/flyteorg/flyteidl v0.21.14-0.20220107104725-b573f9c00004/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flyteidl v0.21.22 h1:u83LwahAVgnJ5B3taKX7UI1QGkqLEhtYFz2wXMmBREw=
github.com/flyteorg/flyteidl v0.21.22/go.mod h1:576W2ViEyjTpT+kEVHAGbrTP3HARNUZ/eCwrNPmdx9U=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/flyteorg/flytestdlib v0.4.7 h1:SMPPXI3j/MjP7D2fqaR+lPQkTrqYS7xZbwsgJI2F8SU=
github.com/flyteorg/flytestdlib v0.4.7/go.mod h1:fv1ar34LJLMTaf0tbfetisLykUlARi7rP+NQTUn6QQs=
Expand Down
18 changes: 14 additions & 4 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)
Expand Down Expand Up @@ -69,6 +67,18 @@ func (p Phase) IsWaitingForResources() bool {
return p == PhaseWaitingForResources
}

type ExternalResource struct {
// A unique identifier for the external resource
ExternalID string
// A unique index for the external resource. Although the ID may change, this will remain the same
// throughout task event reports and retries.
Index uint32
// The nubmer of times this external resource has been attempted
RetryAttempt uint32
// Phase (if exists) associated with the external resource
Phase Phase
}

type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
Expand All @@ -77,8 +87,8 @@ type TaskInfo struct {
OccurredAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// Metadata around how a task was executed
Metadata *event.TaskExecutionMetadata
// A collection of information about external resources launched by this task
ExternalResources []*ExternalResource
}

func (t *TaskInfo) String() string {
Expand Down
10 changes: 3 additions & 7 deletions go/tasks/pluginmachinery/webapi/example/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/errors"
Expand Down Expand Up @@ -96,11 +94,9 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co
},
},
OccurredAt: &tNow,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
ExternalId: "abc",
},
ExternalResources: []*core.ExternalResource{
{
ExternalID: "abc",
},
},
}), nil
Expand Down
11 changes: 10 additions & 1 deletion go/tasks/plugins/array/awsbatch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flyteorg/flyteplugins/go/tasks/errors"

"github.com/flyteorg/flytestdlib/bitarray"
"github.com/flyteorg/flytestdlib/logger"

arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"
Expand Down Expand Up @@ -53,6 +54,13 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
}

metrics.SubTasksSubmitted.Add(ctx, float64(size))

retryAttemptsArray, err := bitarray.NewCompactArray(uint(size), bitarray.Item(pluginConfig.MaxRetries))
if err != nil {
logger.Errorf(context.Background(), "Failed to create attempts compact array with [count: %v, maxValue: %v]", size, pluginConfig.MaxRetries)
return nil, err
}

parentState := currentState.
SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0).
SetArrayStatus(arraystatus.ArrayStatus{
Expand All @@ -61,7 +69,8 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
},
Detailed: arrayCore.NewPhasesCompactArray(uint(size)),
}).
SetReason("Successfully launched subtasks.")
SetReason("Successfully launched subtasks.").
SetRetryAttempts(retryAttemptsArray)

nextState = currentState.SetExternalJobID(j)
nextState.State = parentState
Expand Down
5 changes: 5 additions & 0 deletions go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package awsbatch
import (
"testing"

"github.com/flyteorg/flytestdlib/bitarray"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -110,6 +111,9 @@ func TestLaunchSubTasks(t *testing.T) {
JobDefinitionArn: "arn",
}

retryAttemptsArray, err := bitarray.NewCompactArray(5, bitarray.Item(0))
assert.NoError(t, err)

expectedState := &State{
State: &core2.State{
CurrentPhase: core2.PhaseCheckingSubTaskExecutions,
Expand All @@ -123,6 +127,7 @@ func TestLaunchSubTasks(t *testing.T) {
},
Detailed: arrayCore.NewPhasesCompactArray(5),
},
RetryAttempts: retryAttemptsArray,
},

ExternalJobID: refStr("qpxyarq"),
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata

newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(actualPhase))
newArrayStatus.Summary.Inc(actualPhase)
parentState.RetryAttempts.SetItem(childIdx, bitarray.Item(len(subJob.Attempts)))
}

if queued > 0 {
Expand Down
8 changes: 8 additions & 0 deletions go/tasks/plugins/array/awsbatch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func TestCheckSubTasksState(t *testing.T) {
inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

retryAttemptsArray, err := bitarray.NewCompactArray(1, bitarray.Item(1))
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
Expand All @@ -146,6 +149,7 @@ func TestCheckSubTasksState(t *testing.T) {
Detailed: arrayCore.NewPhasesCompactArray(1),
},
IndexesToCache: bitarray.NewBitSet(1),
RetryAttempts: retryAttemptsArray,
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
Expand Down Expand Up @@ -180,6 +184,9 @@ func TestCheckSubTasksState(t *testing.T) {
inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

retryAttemptsArray, err := bitarray.NewCompactArray(2, bitarray.Item(1))
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
Expand All @@ -190,6 +197,7 @@ func TestCheckSubTasksState(t *testing.T) {
Detailed: arrayCore.NewPhasesCompactArray(2),
},
IndexesToCache: bitarray.NewBitSet(2),
RetryAttempts: retryAttemptsArray,
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
Expand Down
34 changes: 22 additions & 12 deletions go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"
Expand Down Expand Up @@ -52,6 +50,9 @@ type State struct {

// Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size)
IndexesToCache *bitarray.BitSet `json:"indexesToCache"`

// Tracks the number of subtask retries using the execution index
RetryAttempts bitarray.CompactArray `json:"retryAttempts"`
}

func (s State) GetReason() string {
Expand Down Expand Up @@ -111,6 +112,11 @@ func (s *State) SetReason(reason string) *State {
return s
}

func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State {
s.RetryAttempts = retryAttempts
return s
}

func (s *State) SetExecutionArraySize(size int) *State {
s.ExecutionArraySize = size
return s
Expand Down Expand Up @@ -171,20 +177,24 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 {
// handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping
// all the log links takes up a lot of space).
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) {

phaseInfo := core.PhaseInfoUndefined
t := time.Now()

nowTaskInfo := &core.TaskInfo{
OccurredAt: &t,
Logs: logLinks,
}
if nowTaskInfo.Metadata == nil {
nowTaskInfo.Metadata = &event.TaskExecutionMetadata{}
OccurredAt: &t,
Logs: logLinks,
ExternalResources: make([]*core.ExternalResource, len(subTaskIDs)),
}
for _, subTaskID := range subTaskIDs {
nowTaskInfo.Metadata.ExternalResources = append(nowTaskInfo.Metadata.ExternalResources, &event.ExternalResourceInfo{
ExternalId: *subTaskID,
})

for childIndex, subTaskID := range subTaskIDs {
originalIndex := CalculateOriginalIndex(childIndex, state.GetIndexesToCache())

nowTaskInfo.ExternalResources[childIndex] = &core.ExternalResource{
ExternalID: *subTaskID,
Index: uint32(originalIndex),
RetryAttempt: uint32(state.RetryAttempts.GetItem(childIndex)),
Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIndex)],
}
}

switch p, version := state.GetPhase(); p {
Expand Down
Loading