From 0f10175a4bfd1186fa50314aa236017c881263b1 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Fri, 23 Aug 2024 12:07:05 -0700 Subject: [PATCH] [BUG] use deep copy of bit arrays when getting array node state (#5681) * [BUG] add retries to handle array node eventing race condition (#421) If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop. [TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38). We can run into issues with ArrayNode eventing when: - array node handler increments task phase version from "0" to "1" - admin event sink emits event with version "1" - the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0 - next loop, array node handler increments task phase version from "0" to "1" - admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up. This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state. I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed. - added unit test - tested locally in sandbox - test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370 - should be fine to rollout to prod Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually * [x] Added tests * [x] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation Signed-off-by: Paul Dittamo * handle already exists error on array node abort (#427) * handle already exists error on array node abort Signed-off-by: Paul Dittamo * update comment Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo * [BUG] set cause for already exists EventError (#432) * set cause for already exists EventError Signed-off-by: Paul Dittamo * add nil check event error Signed-off-by: Paul Dittamo * lint Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo * add deep copy for array node status Signed-off-by: Paul Dittamo * add deep copy for array node status Signed-off-by: Paul Dittamo * use deep copy of bit arrays when getting array node state Signed-off-by: Paul Dittamo * Revert "add deep copy for array node status" This reverts commit dde75951d87cb497e358a5bd0ff27f05078f5b72. Signed-off-by: Paul Dittamo * ignore ErrorOnAlreadyExists when marshalling event config Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo Signed-off-by: pmahindrakar-oss --- .../flyteworkflow/v1alpha1/node_status.go | 21 ++++++ .../v1alpha1/zz_generated.deepcopy.go | 4 ++ .../pkg/controller/config/config.go | 3 +- .../pkg/controller/config/config_flags.go | 2 +- .../controller/config/config_flags_test.go | 6 +- .../controller/nodes/array/handler_test.go | 70 ++++++++++++++++--- .../controller/nodes/node_state_manager.go | 24 +++++-- 7 files changed, 110 insertions(+), 20 deletions(-) diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index aab034224d7..218b0455882 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -316,6 +316,27 @@ func (in *ArrayNodeStatus) SetTaskPhaseVersion(taskPhaseVersion uint32) { } } +func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) { + *out = *in + out.MutableStruct = in.MutableStruct + + if in.ExecutionError != nil { + in, out := &in.ExecutionError, &out.ExecutionError + *out = new(core.ExecutionError) + *out = *in + } +} + +func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus { + if in == nil { + return nil + } + + out := &ArrayNodeStatus{} + in.DeepCopyInto(out) + return out +} + type NodeStatus struct { MutableStruct Phase NodePhase `json:"phase,omitempty"` diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go index febbca733c2..95cac582b81 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go @@ -548,6 +548,10 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { *out = new(DynamicNodeStatus) (*in).DeepCopyInto(*out) } + if in.ArrayNodeStatus != nil { + in, out := &in.ArrayNodeStatus, &out.ArrayNodeStatus + *out = (*in).DeepCopy() + } if in.Error != nil { in, out := &in.Error, &out.Error *out = (*in).DeepCopy() diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 81489da17c8..0e3da1bd118 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -278,7 +278,8 @@ const ( type EventConfig struct { RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` - ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."` + // only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode + ErrorOnAlreadyExists bool `json:"-"` } // ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index d2dc0971ffd..858fc8a8ba8 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -100,7 +100,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.error-on-already-exists"), defaultConfig.EventConfig.ErrorOnAlreadyExists, "Whether to return an error when an event already exists.") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.-"), defaultConfig.EventConfig.ErrorOnAlreadyExists, "") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "include-shard-key-label"), defaultConfig.IncludeShardKeyLabel, "Include the specified shard key label in the k8s FlyteWorkflow CRD label selector") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "exclude-shard-key-label"), defaultConfig.ExcludeShardKeyLabel, "Exclude the specified shard key label from the k8s FlyteWorkflow CRD label selector") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "include-project-label"), defaultConfig.IncludeProjectLabel, "Include the specified project label in the k8s FlyteWorkflow CRD label selector") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 66a14381af6..27e7b76efac 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -799,13 +799,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_event-config.error-on-already-exists", func(t *testing.T) { + t.Run("Test_event-config.-", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("event-config.error-on-already-exists", testValue) - if vBool, err := cmdFlags.GetBool("event-config.error-on-already-exists"); err == nil { + cmdFlags.Set("event-config.-", testValue) + if vBool, err := cmdFlags.GetBool("event-config.-"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.EventConfig.ErrorOnAlreadyExists) } else { diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index d27b412c1f5..648d70e36c4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -539,7 +539,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -559,7 +563,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { subNodeTransitions: []handler.Transition{ handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, @@ -580,7 +588,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -601,7 +613,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { subNodeTransitions: []handler.Transition{ handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, @@ -619,8 +635,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { core.PhaseUndefined, core.PhaseUndefined, }, - subNodeTransitions: []handler.Transition{}, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + subNodeTransitions: []handler.Transition{}, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{}, @@ -642,7 +662,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -663,7 +687,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseSucceeded, + v1alpha1.NodePhaseSucceeded, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_SUCCEEDED}, @@ -684,7 +712,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseSucceeded, + v1alpha1.NodePhaseFailed, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_FAILED}, @@ -704,7 +736,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseFailed, + v1alpha1.NodePhaseSucceeded, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_FAILED, idlcore.TaskExecution_SUCCEEDED}, @@ -724,7 +760,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 2, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -749,6 +789,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, useFakeEventRecorder: true, eventRecorderFailures: 5, @@ -771,6 +815,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, useFakeEventRecorder: true, eventRecorderError: fmt.Errorf("err"), diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index 91cf1f26799..a9ead9afc30 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -160,11 +160,27 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState { if an != nil { as.Phase = an.GetArrayNodePhase() as.Error = an.GetExecutionError() - as.SubNodePhases = an.GetSubNodePhases() - as.SubNodeTaskPhases = an.GetSubNodeTaskPhases() - as.SubNodeRetryAttempts = an.GetSubNodeRetryAttempts() - as.SubNodeSystemFailures = an.GetSubNodeSystemFailures() as.TaskPhaseVersion = an.GetTaskPhaseVersion() + + subNodePhases := an.GetSubNodePhases() + if subNodePhasesCopy := subNodePhases.DeepCopy(); subNodePhasesCopy != nil { + as.SubNodePhases = *subNodePhasesCopy + } + + subNodeTaskPhases := an.GetSubNodeTaskPhases() + if subNodeTaskPhasesCopy := subNodeTaskPhases.DeepCopy(); subNodeTaskPhasesCopy != nil { + as.SubNodeTaskPhases = *subNodeTaskPhasesCopy + } + + subNodeRetryAttempts := an.GetSubNodeRetryAttempts() + if subNodeRetryAttemptsCopy := subNodeRetryAttempts.DeepCopy(); subNodeRetryAttemptsCopy != nil { + as.SubNodeRetryAttempts = *subNodeRetryAttemptsCopy + } + + subNodeSystemFailures := an.GetSubNodeSystemFailures() + if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil { + as.SubNodeSystemFailures = *subNodeSystemFailuresCopy + } } return as }