Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Added support for aborting task nodes reported as failures #541

Merged
merged 5 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flyteplugins v1.0.49
github.com/flyteorg/flyteplugins v1.0.52
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYF
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.49 h1:lUmT4kqYamkJY2tO6nCWRCnVv2M2QNLIap5bFYAol7s=
github.com/flyteorg/flyteplugins v1.0.49/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flyteplugins v1.0.52 h1:AWNrRYgm0bCzOws+bIfJDfPBZqBmTdABxW78r8q3kP4=
github.com/flyteorg/flyteplugins v1.0.52/go.mod h1:ztsonku5fKwyxcIg1k69PTiBVjRI6d3nK5DnC+iwx08=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ type ExecutableTaskNodeStatus interface {
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
GetPreviousNodeExecutionCheckpointPath() DataReference
GetCleanupOnFailure() bool
}

type MutableTaskNodeStatus interface {
Expand All @@ -371,6 +372,7 @@ type MutableTaskNodeStatus interface {
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
SetPreviousNodeExecutionCheckpointPath(DataReference)
SetCleanupOnFailure(bool)
}

// ExecutableWorkflowNode is an interface for a Child Workflow Node
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/mocks/MutableTaskNodeStatus.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type TaskNodeStatus struct {
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
PreviousNodeExecutionCheckpointPath DataReference `json:"checkpointPath,omitempty"`
CleanupOnFailure bool `json:"clean,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand Down Expand Up @@ -795,6 +796,11 @@ func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.SetDirty()
}

func (in *TaskNodeStatus) SetCleanupOnFailure(cleanupOnFailure bool) {
in.CleanupOnFailure = cleanupOnFailure
in.SetDirty()
}

func (in *TaskNodeStatus) GetPluginState() []byte {
return in.PluginState
}
Expand Down Expand Up @@ -829,6 +835,10 @@ func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}

func (in TaskNodeStatus) GetCleanupOnFailure() bool {
return in.CleanupOnFailure
}

func (in *TaskNodeStatus) UpdatePhase(phase int, phaseVersion uint32) {
if in.Phase != phase || in.PhaseVersion != phaseVersion {
in.SetDirty()
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TaskNodeState struct {
PluginStateVersion uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
CleanupOnFailure bool
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
PluginState: tn.GetPluginState(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
CleanupOnFailure: tn.GetCleanupOnFailure(),
}
}
return handler.TaskNodeState{}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
PluginPhaseVersion: pluginTrns.pInfo.Version(),
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand All @@ -761,10 +762,11 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}

func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error {
currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase
taskNodeState := nCtx.NodeStateReader().GetTaskNodeState()
currentPhase := taskNodeState.PluginPhase
logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase)

if currentPhase.IsTerminal() {
if currentPhase.IsTerminal() && !(currentPhase.IsFailure() && taskNodeState.CleanupOnFailure) {
logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetPluginState(n.t.PluginState)
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetPreviousNodeExecutionCheckpointPath(n.t.PreviousNodeExecutionCheckpointURI)
t.SetCleanupOnFailure(n.t.CleanupOnFailure)
}

// Update dynamic node status
Expand Down