Skip to content

Commit

Permalink
Option to clear node state on any termination (#4596)
Browse files Browse the repository at this point in the history
* `ClearSubNodeStatus` on failure

Signed-off-by: Thomas Newton <[email protected]>

* More aggressive collapsing

Signed-off-by: Thomas Newton <[email protected]>

* Tidy

Signed-off-by: Thomas Newton <[email protected]>

* Fix panic

Signed-off-by: Thomas Newton <[email protected]>

* Tidy

Signed-off-by: Thomas Newton <[email protected]>

* Handle possibility of nil startedAt time

Signed-off-by: Thomas Newton <[email protected]>

* Update test assertions

Signed-off-by: Thomas Newton <[email protected]>

* Config flag attempt 1

Signed-off-by: Thomas Newton <[email protected]>

* Update more calls to UpdatePhase

Signed-off-by: Thomas Newton <[email protected]>

* Update generated code

Signed-off-by: Thomas Newton <[email protected]>

* Update tests

Signed-off-by: Thomas Newton <[email protected]>

* Fix more tests

Signed-off-by: Thomas Newton <[email protected]>

* Fix tests

Signed-off-by: Thomas Newton <[email protected]>

* Rename clear-state-on-termination -> clear-state-on-any-termination

Signed-off-by: Thomas Newton <[email protected]>

* Add a comment

Signed-off-by: Thomas Newton <[email protected]>

* Missing test coverage - add test case `non-terminal-timing-out`

Signed-off-by: Thomas Newton <[email protected]>

* Add test coverage for `TestNodeExecutor_RecursiveNodeHandler_Recurse` with `clearStateOnAnyTermination=true`

Signed-off-by: Thomas Newton <[email protected]>

* Address another TODO

Signed-off-by: Thomas Newton <[email protected]>

* Add comment

Signed-off-by: Thomas Newton <[email protected]>

* Rename the config option to `enable-cr-debug-metadata`

Signed-off-by: Thomas Newton <[email protected]>

---------

Signed-off-by: Thomas Newton <[email protected]>
  • Loading branch information
Tom-Newton authored Jan 16, 2024
1 parent 136ac8d commit d208ed3
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 189 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ type MutableNodeStatus interface {
SetOutputDir(d DataReference)
SetParentNodeID(n *NodeID)
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError)
IncrementAttempts() uint32
IncrementSystemFailures() uint32
SetCached()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 22 additions & 25 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ func (in *NodeStatus) GetOrCreateArrayNodeStatus() MutableArrayNodeStatus {
return in.ArrayNodeStatus
}

func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError) {
func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason string, enableCRDebugMetadata bool, err *core.ExecutionError) {
if in.Phase == p && in.Message == reason {
// We will not update the phase multiple times. This prevents the comparison from returning false positive
return
Expand All @@ -607,6 +607,7 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st
}

n := occurredAt
in.LastUpdatedAt = &n
if occurredAt.IsZero() {
n = metav1.Now()
}
Expand All @@ -625,35 +626,31 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st
in.LastAttemptStartedAt = &n
}
} else if IsPhaseTerminal(p) {
// If we are in terminal phase then we will clear out all our fields as they are not required anymore
// Only thing required is stopped at and lastupdatedat time
if in.StoppedAt == nil {
in.StoppedAt = &n
}
if in.StartedAt == nil {
in.StartedAt = &n
}
if in.LastAttemptStartedAt == nil {
in.LastAttemptStartedAt = &n
if p == NodePhaseSucceeded || p == NodePhaseSkipped || !enableCRDebugMetadata {
// Clear most status related fields after reaching a terminal state. This keeps the CR state small to avoid
// etcd size limits. Importantly we keep Phase, StoppedAt and Error which will be needed further.
in.Message = ""
in.QueuedAt = nil
in.StartedAt = nil
in.LastUpdatedAt = nil
in.LastAttemptStartedAt = nil
in.DynamicNodeStatus = nil
in.BranchStatus = nil
in.SubNodeStatus = nil
in.TaskNodeStatus = nil
in.WorkflowNodeStatus = nil
} else {
if in.StartedAt == nil {
in.StartedAt = &n
}
if in.LastAttemptStartedAt == nil {
in.LastAttemptStartedAt = &n
}
}
}
in.LastUpdatedAt = &n

// For cases in which the node is either Succeeded or Skipped we clear most fields from the status
// except for StoppedAt and Phase. StoppedAt is used to calculate transition latency between this node and
// any downstream nodes and Phase is required for propeller to continue to downstream nodes.
if p == NodePhaseSucceeded || p == NodePhaseSkipped {
in.Message = ""
in.QueuedAt = nil
in.StartedAt = nil
in.LastAttemptStartedAt = nil
in.DynamicNodeStatus = nil
in.BranchStatus = nil
in.SubNodeStatus = nil
in.TaskNodeStatus = nil
in.WorkflowNodeStatus = nil
in.LastUpdatedAt = nil
}
in.SetDirty()
}

Expand Down
Loading

0 comments on commit d208ed3

Please sign in to comment.