Skip to content

Commit

Permalink
Added support for aborting task nodes reported as failures (flyteorg#541
Browse files Browse the repository at this point in the history
)

* added CleanupOnFailure support for TaskNodeStatus to support aborting failed task nodes

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins and generated

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteplugins

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Apr 28, 2023
1 parent 0487891 commit d84d9df
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.49
github.com/flyteorg/flyteplugins v1.0.52
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.49 h1:lUmT4kqYamkJY2tO6nCWRCnVv2M2QNLIap5bFYAol7s=
github.com/flyteorg/flyteplugins v1.0.49/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flyteplugins v1.0.52 h1:AWNrRYgm0bCzOws+bIfJDfPBZqBmTdABxW78r8q3kP4=
github.com/flyteorg/flyteplugins v1.0.52/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ type ExecutableTaskNodeStatus interface {
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
GetPreviousNodeExecutionCheckpointPath() DataReference
GetCleanupOnFailure() bool
}

type MutableTaskNodeStatus interface {
Expand All @@ -371,6 +372,7 @@ type MutableTaskNodeStatus interface {
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
SetPreviousNodeExecutionCheckpointPath(DataReference)
SetCleanupOnFailure(bool)
}

// ExecutableWorkflowNode is an interface for a Child Workflow Node
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type TaskNodeStatus struct {
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
PreviousNodeExecutionCheckpointPath DataReference `json:"checkpointPath,omitempty"`
CleanupOnFailure bool `json:"clean,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand Down Expand Up @@ -795,6 +796,11 @@ func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.SetDirty()
}

func (in *TaskNodeStatus) SetCleanupOnFailure(cleanupOnFailure bool) {
in.CleanupOnFailure = cleanupOnFailure
in.SetDirty()
}

func (in *TaskNodeStatus) GetPluginState() []byte {
return in.PluginState
}
Expand Down Expand Up @@ -829,6 +835,10 @@ func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}

func (in TaskNodeStatus) GetCleanupOnFailure() bool {
return in.CleanupOnFailure
}

func (in *TaskNodeStatus) UpdatePhase(phase int, phaseVersion uint32) {
if in.Phase != phase || in.PhaseVersion != phaseVersion {
in.SetDirty()
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TaskNodeState struct {
PluginStateVersion uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
CleanupOnFailure bool
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginState: tn.GetPluginState(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
CleanupOnFailure: tn.GetCleanupOnFailure(),
}
}
return handler.TaskNodeState{}
Expand Down
6 changes: 4 additions & 2 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginPhaseVersion: pluginTrns.pInfo.Version(),
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand All @@ -761,10 +762,11 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}

func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error {
currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase
taskNodeState := nCtx.NodeStateReader().GetTaskNodeState()
currentPhase := taskNodeState.PluginPhase
logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase)

if currentPhase.IsTerminal() {
if currentPhase.IsTerminal() && !(currentPhase.IsFailure() && taskNodeState.CleanupOnFailure) {
logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetPluginState(n.t.PluginState)
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetPreviousNodeExecutionCheckpointPath(n.t.PreviousNodeExecutionCheckpointURI)
t.SetCleanupOnFailure(n.t.CleanupOnFailure)
}

// Update dynamic node status
Expand Down

0 comments on commit d84d9df

Please sign in to comment.