Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Track RetryAttempt and Phase of ExternalResources (#231)
Browse files Browse the repository at this point in the history
* replaced Metadata proto in TaskInfo with ExternalResource array

Signed-off-by: Daniel Rammer <[email protected]>

* added ExternalResource documentation comments

Signed-off-by: Daniel Rammer <[email protected]>

* setting retry attempt on external resources

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests and lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* tracking RetryAttempt for k8s array plugin using a CompactArray

Signed-off-by: Daniel Rammer <[email protected]>

* added a few comments

Signed-off-by: Daniel Rammer <[email protected]>

* setting RetryAttempts for awsbatch subtasks

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests and lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* populating external resource index with original index

Signed-off-by: Daniel Rammer <[email protected]>

* updated comments

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl version

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jan 24, 2022
1 parent 84cef4a commit f594e15
Show file tree
Hide file tree
Showing 21 changed files with 193 additions and 123 deletions.
18 changes: 14 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)
Expand Down Expand Up @@ -69,6 +67,18 @@ func (p Phase) IsWaitingForResources() bool {
return p == PhaseWaitingForResources
}

type ExternalResource struct {
// A unique identifier for the external resource
ExternalID string
// A unique index for the external resource. Although the ID may change, this will remain the same
// throughout task event reports and retries.
Index uint32
// The nubmer of times this external resource has been attempted
RetryAttempt uint32
// Phase (if exists) associated with the external resource
Phase Phase
}

type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
Expand All @@ -77,8 +87,8 @@ type TaskInfo struct {
OccurredAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// Metadata around how a task was executed
Metadata *event.TaskExecutionMetadata
// A collection of information about external resources launched by this task
ExternalResources []*ExternalResource
}

func (t *TaskInfo) String() string {
Expand Down
10 changes: 3 additions & 7 deletions flyteplugins/go/tasks/pluginmachinery/webapi/example/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

idlCore "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/errors"
Expand Down Expand Up @@ -96,11 +94,9 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co
},
},
OccurredAt: &tNow,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
ExternalId: "abc",
},
ExternalResources: []*core.ExternalResource{
{
ExternalID: "abc",
},
},
}), nil
Expand Down
11 changes: 10 additions & 1 deletion flyteplugins/go/tasks/plugins/array/awsbatch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/flyteorg/flyteplugins/go/tasks/errors"

"github.com/flyteorg/flytestdlib/bitarray"
"github.com/flyteorg/flytestdlib/logger"

arrayCore "github.com/flyteorg/flyteplugins/go/tasks/plugins/array/core"
Expand Down Expand Up @@ -53,6 +54,13 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
}

metrics.SubTasksSubmitted.Add(ctx, float64(size))

retryAttemptsArray, err := bitarray.NewCompactArray(uint(size), bitarray.Item(pluginConfig.MaxRetries))
if err != nil {
logger.Errorf(context.Background(), "Failed to create attempts compact array with [count: %v, maxValue: %v]", size, pluginConfig.MaxRetries)
return nil, err
}

parentState := currentState.
SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0).
SetArrayStatus(arraystatus.ArrayStatus{
Expand All @@ -61,7 +69,8 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
},
Detailed: arrayCore.NewPhasesCompactArray(uint(size)),
}).
SetReason("Successfully launched subtasks.")
SetReason("Successfully launched subtasks.").
SetRetryAttempts(retryAttemptsArray)

nextState = currentState.SetExternalJobID(j)
nextState.State = parentState
Expand Down
5 changes: 5 additions & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package awsbatch
import (
"testing"

"github.com/flyteorg/flytestdlib/bitarray"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -110,6 +111,9 @@ func TestLaunchSubTasks(t *testing.T) {
JobDefinitionArn: "arn",
}

retryAttemptsArray, err := bitarray.NewCompactArray(5, bitarray.Item(0))
assert.NoError(t, err)

expectedState := &State{
State: &core2.State{
CurrentPhase: core2.PhaseCheckingSubTaskExecutions,
Expand All @@ -123,6 +127,7 @@ func TestLaunchSubTasks(t *testing.T) {
},
Detailed: arrayCore.NewPhasesCompactArray(5),
},
RetryAttempts: retryAttemptsArray,
},

ExternalJobID: refStr("qpxyarq"),
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata

newArrayStatus.Detailed.SetItem(childIdx, bitarray.Item(actualPhase))
newArrayStatus.Summary.Inc(actualPhase)
parentState.RetryAttempts.SetItem(childIdx, bitarray.Item(len(subJob.Attempts)))
}

if queued > 0 {
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func TestCheckSubTasksState(t *testing.T) {
inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

retryAttemptsArray, err := bitarray.NewCompactArray(1, bitarray.Item(1))
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
Expand All @@ -146,6 +149,7 @@ func TestCheckSubTasksState(t *testing.T) {
Detailed: arrayCore.NewPhasesCompactArray(1),
},
IndexesToCache: bitarray.NewBitSet(1),
RetryAttempts: retryAttemptsArray,
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
Expand Down Expand Up @@ -180,6 +184,9 @@ func TestCheckSubTasksState(t *testing.T) {
inMemDatastore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

retryAttemptsArray, err := bitarray.NewCompactArray(2, bitarray.Item(1))
assert.NoError(t, err)

newState, err := CheckSubTasksState(ctx, tMeta, "", "", jobStore, inMemDatastore, &config.Config{}, &State{
State: &arrayCore.State{
CurrentPhase: arrayCore.PhaseCheckingSubTaskExecutions,
Expand All @@ -190,6 +197,7 @@ func TestCheckSubTasksState(t *testing.T) {
Detailed: arrayCore.NewPhasesCompactArray(2),
},
IndexesToCache: bitarray.NewBitSet(2),
RetryAttempts: retryAttemptsArray,
},
ExternalJobID: refStr("job-id"),
JobDefinitionArn: "",
Expand Down
34 changes: 22 additions & 12 deletions flyteplugins/go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"

"github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flyteplugins/go/tasks/plugins/array/arraystatus"
Expand Down Expand Up @@ -52,6 +50,9 @@ type State struct {

// Which sub-tasks to cache, (using the original index, that is, the length is ArrayJob.size)
IndexesToCache *bitarray.BitSet `json:"indexesToCache"`

// Tracks the number of subtask retries using the execution index
RetryAttempts bitarray.CompactArray `json:"retryAttempts"`
}

func (s State) GetReason() string {
Expand Down Expand Up @@ -111,6 +112,11 @@ func (s *State) SetReason(reason string) *State {
return s
}

func (s *State) SetRetryAttempts(retryAttempts bitarray.CompactArray) *State {
s.RetryAttempts = retryAttempts
return s
}

func (s *State) SetExecutionArraySize(size int) *State {
s.ExecutionArraySize = size
return s
Expand Down Expand Up @@ -171,20 +177,24 @@ func GetPhaseVersionOffset(currentPhase Phase, length int64) uint32 {
// handling as we don't have to keep an ever growing list of log links (our batch jobs can be 5000 sub-tasks, keeping
// all the log links takes up a lot of space).
func MapArrayStateToPluginPhase(_ context.Context, state *State, logLinks []*idlCore.TaskLog, subTaskIDs []*string) (core.PhaseInfo, error) {

phaseInfo := core.PhaseInfoUndefined
t := time.Now()

nowTaskInfo := &core.TaskInfo{
OccurredAt: &t,
Logs: logLinks,
}
if nowTaskInfo.Metadata == nil {
nowTaskInfo.Metadata = &event.TaskExecutionMetadata{}
OccurredAt: &t,
Logs: logLinks,
ExternalResources: make([]*core.ExternalResource, len(subTaskIDs)),
}
for _, subTaskID := range subTaskIDs {
nowTaskInfo.Metadata.ExternalResources = append(nowTaskInfo.Metadata.ExternalResources, &event.ExternalResourceInfo{
ExternalId: *subTaskID,
})

for childIndex, subTaskID := range subTaskIDs {
originalIndex := CalculateOriginalIndex(childIndex, state.GetIndexesToCache())

nowTaskInfo.ExternalResources[childIndex] = &core.ExternalResource{
ExternalID: *subTaskID,
Index: uint32(originalIndex),
RetryAttempt: uint32(state.RetryAttempts.GetItem(childIndex)),
Phase: core.Phases[state.ArrayStatus.Detailed.GetItem(childIndex)],
}
}

switch p, version := state.GetPhase(); p {
Expand Down
Loading

0 comments on commit f594e15

Please sign in to comment.