Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
added CleanupOnFailure support for TaskNodeStatus to support aborting…
Browse files Browse the repository at this point in the history
… failed task nodes

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Mar 21, 2023
1 parent 48c37de commit a77056b
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 7 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.9
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
Expand Down Expand Up @@ -147,3 +147,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.43-0.20230321073949-b11326754ed4
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk=
github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s=
github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q=
github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.43-0.20230321073949-b11326754ed4 h1:6jfUpCYxYGilPgLZqmLF9/J5nLDHrXGsUeO0B6EdMzw=
github.com/flyteorg/flyteplugins v1.0.43-0.20230321073949-b11326754ed4/go.mod h1:5in2y2zWO6fbheoPJ44wNRppfVpjkWXCs0dy+oA232o=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ type ExecutableTaskNodeStatus interface {
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
GetPreviousNodeExecutionCheckpointPath() DataReference
GetCleanupOnFailure() bool
}

type MutableTaskNodeStatus interface {
Expand All @@ -371,6 +372,7 @@ type MutableTaskNodeStatus interface {
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
SetPreviousNodeExecutionCheckpointPath(DataReference)
SetCleanupOnFailure(bool)
}

// ExecutableWorkflowNode is an interface for a Child Workflow Node
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type TaskNodeStatus struct {
BarrierClockTick uint32 `json:"tick,omitempty"`
LastPhaseUpdatedAt time.Time `json:"updAt,omitempty"`
PreviousNodeExecutionCheckpointPath DataReference `json:"checkpointPath,omitempty"`
CleanupOnFailure bool `json:"clean,omitempty"`
}

func (in *TaskNodeStatus) GetBarrierClockTick() uint32 {
Expand Down Expand Up @@ -795,6 +796,11 @@ func (in *TaskNodeStatus) SetPluginStateVersion(v uint32) {
in.SetDirty()
}

func (in *TaskNodeStatus) SetCleanupOnFailure(cleanupOnFailure bool) {
in.CleanupOnFailure = cleanupOnFailure
in.SetDirty()
}

func (in *TaskNodeStatus) GetPluginState() []byte {
return in.PluginState
}
Expand Down Expand Up @@ -829,6 +835,10 @@ func (in TaskNodeStatus) GetPhaseVersion() uint32 {
return in.PhaseVersion
}

func (in TaskNodeStatus) GetCleanupOnFailure() bool {
return in.CleanupOnFailure
}

func (in *TaskNodeStatus) UpdatePhase(phase int, phaseVersion uint32) {
if in.Phase != phase || in.PhaseVersion != phaseVersion {
in.SetDirty()
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type TaskNodeState struct {
BarrierClockTick uint32
LastPhaseUpdatedAt time.Time
PreviousNodeExecutionCheckpointURI storage.DataReference
CleanupOnFailure bool
}

type BranchNodeState struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/node_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
PreviousNodeExecutionCheckpointURI: tn.GetPreviousNodeExecutionCheckpointPath(),
CleanupOnFailure: tn.GetCleanupOnFailure(),
}
}
return handler.TaskNodeState{}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
BarrierClockTick: barrierTick,
LastPhaseUpdatedAt: time.Now(),
PreviousNodeExecutionCheckpointURI: ts.PreviousNodeExecutionCheckpointURI,
CleanupOnFailure: ts.CleanupOnFailure || pluginTrns.pInfo.CleanupOnFailure(),
})
if err != nil {
logger.Errorf(ctx, "Failed to store TaskNode state, err :%s", err.Error())
Expand All @@ -789,10 +790,11 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
}

func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error {
currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase
taskNodeState := nCtx.NodeStateReader().GetTaskNodeState()
currentPhase := taskNodeState.PluginPhase
logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase)

if currentPhase.IsTerminal() {
if currentPhase.IsTerminal() && !(currentPhase.IsFailure() && taskNodeState.CleanupOnFailure) {
logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa
t.SetPluginStateVersion(n.t.PluginStateVersion)
t.SetBarrierClockTick(n.t.BarrierClockTick)
t.SetPreviousNodeExecutionCheckpointPath(n.t.PreviousNodeExecutionCheckpointURI)
t.SetCleanupOnFailure(n.t.CleanupOnFailure)
}

// Update dynamic node status
Expand Down

0 comments on commit a77056b

Please sign in to comment.