diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go index aab034224d7..218b0455882 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -316,6 +316,27 @@ func (in *ArrayNodeStatus) SetTaskPhaseVersion(taskPhaseVersion uint32) { } } +func (in *ArrayNodeStatus) DeepCopyInto(out *ArrayNodeStatus) { + *out = *in + out.MutableStruct = in.MutableStruct + + if in.ExecutionError != nil { + in, out := &in.ExecutionError, &out.ExecutionError + *out = new(core.ExecutionError) + *out = *in + } +} + +func (in *ArrayNodeStatus) DeepCopy() *ArrayNodeStatus { + if in == nil { + return nil + } + + out := &ArrayNodeStatus{} + in.DeepCopyInto(out) + return out +} + type NodeStatus struct { MutableStruct Phase NodePhase `json:"phase,omitempty"` diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go index febbca733c2..95cac582b81 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/zz_generated.deepcopy.go @@ -548,6 +548,10 @@ func (in *NodeStatus) DeepCopyInto(out *NodeStatus) { *out = new(DynamicNodeStatus) (*in).DeepCopyInto(*out) } + if in.ArrayNodeStatus != nil { + in, out := &in.ArrayNodeStatus, &out.ArrayNodeStatus + *out = (*in).DeepCopy() + } if in.Error != nil { in, out := &in.Error, &out.Error *out = (*in).DeepCopy() diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 81489da17c8..0e3da1bd118 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -278,7 +278,8 @@ const ( type EventConfig struct { RawOutputPolicy RawOutputPolicy `json:"raw-output-policy" pflag:",How output data should be passed along in execution events."` FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` - ErrorOnAlreadyExists bool `json:"error-on-already-exists" pflag:",Whether to return an error when an event already exists."` + // only meant to be overridden for certain node types that have different eventing behavior such as ArrayNode + ErrorOnAlreadyExists bool `json:"-"` } // ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index d2dc0971ffd..858fc8a8ba8 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -100,7 +100,7 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "max-streak-length"), defaultConfig.MaxStreakLength, "Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "event-config.raw-output-policy"), defaultConfig.EventConfig.RawOutputPolicy, "How output data should be passed along in execution events.") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.fallback-to-output-reference"), defaultConfig.EventConfig.FallbackToOutputReference, "Whether output data should be sent by reference when it is too large to be sent inline in execution events.") - cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.error-on-already-exists"), defaultConfig.EventConfig.ErrorOnAlreadyExists, "Whether to return an error when an event already exists.") + cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "event-config.-"), defaultConfig.EventConfig.ErrorOnAlreadyExists, "") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "include-shard-key-label"), defaultConfig.IncludeShardKeyLabel, "Include the specified shard key label in the k8s FlyteWorkflow CRD label selector") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "exclude-shard-key-label"), defaultConfig.ExcludeShardKeyLabel, "Exclude the specified shard key label from the k8s FlyteWorkflow CRD label selector") cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "include-project-label"), defaultConfig.IncludeProjectLabel, "Include the specified project label in the k8s FlyteWorkflow CRD label selector") diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 66a14381af6..27e7b76efac 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -799,13 +799,13 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_event-config.error-on-already-exists", func(t *testing.T) { + t.Run("Test_event-config.-", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("event-config.error-on-already-exists", testValue) - if vBool, err := cmdFlags.GetBool("event-config.error-on-already-exists"); err == nil { + cmdFlags.Set("event-config.-", testValue) + if vBool, err := cmdFlags.GetBool("event-config.-"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vBool), &actual.EventConfig.ErrorOnAlreadyExists) } else { diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index d27b412c1f5..648d70e36c4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -539,7 +539,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -559,7 +563,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { subNodeTransitions: []handler.Transition{ handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, @@ -580,7 +588,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -601,7 +613,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { subNodeTransitions: []handler.Transition{ handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, @@ -619,8 +635,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { core.PhaseUndefined, core.PhaseUndefined, }, - subNodeTransitions: []handler.Transition{}, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + subNodeTransitions: []handler.Transition{}, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{}, @@ -642,7 +662,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 1, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -663,7 +687,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseSucceeded, + v1alpha1.NodePhaseSucceeded, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_SUCCEEDED}, @@ -684,7 +712,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseSucceeding, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseSucceeded, + v1alpha1.NodePhaseFailed, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_SUCCEEDED, idlcore.TaskExecution_FAILED}, @@ -704,7 +736,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(0, "", "", &handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseFailing, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseFailed, + v1alpha1.NodePhaseSucceeded, + }, expectedTaskPhaseVersion: 0, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_FAILED, idlcore.TaskExecution_SUCCEEDED}, @@ -724,7 +760,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, - expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseRunning, + v1alpha1.NodePhaseRunning, + }, expectedTaskPhaseVersion: 2, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, @@ -749,6 +789,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, useFakeEventRecorder: true, eventRecorderFailures: 5, @@ -771,6 +815,10 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{})), }, + expectedArrayNodeSubPhases: []v1alpha1.NodePhase{ + v1alpha1.NodePhaseQueued, + v1alpha1.NodePhaseQueued, + }, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, useFakeEventRecorder: true, eventRecorderError: fmt.Errorf("err"), diff --git a/flytepropeller/pkg/controller/nodes/node_state_manager.go b/flytepropeller/pkg/controller/nodes/node_state_manager.go index 91cf1f26799..a9ead9afc30 100644 --- a/flytepropeller/pkg/controller/nodes/node_state_manager.go +++ b/flytepropeller/pkg/controller/nodes/node_state_manager.go @@ -160,11 +160,27 @@ func (n nodeStateManager) GetArrayNodeState() handler.ArrayNodeState { if an != nil { as.Phase = an.GetArrayNodePhase() as.Error = an.GetExecutionError() - as.SubNodePhases = an.GetSubNodePhases() - as.SubNodeTaskPhases = an.GetSubNodeTaskPhases() - as.SubNodeRetryAttempts = an.GetSubNodeRetryAttempts() - as.SubNodeSystemFailures = an.GetSubNodeSystemFailures() as.TaskPhaseVersion = an.GetTaskPhaseVersion() + + subNodePhases := an.GetSubNodePhases() + if subNodePhasesCopy := subNodePhases.DeepCopy(); subNodePhasesCopy != nil { + as.SubNodePhases = *subNodePhasesCopy + } + + subNodeTaskPhases := an.GetSubNodeTaskPhases() + if subNodeTaskPhasesCopy := subNodeTaskPhases.DeepCopy(); subNodeTaskPhasesCopy != nil { + as.SubNodeTaskPhases = *subNodeTaskPhasesCopy + } + + subNodeRetryAttempts := an.GetSubNodeRetryAttempts() + if subNodeRetryAttemptsCopy := subNodeRetryAttempts.DeepCopy(); subNodeRetryAttemptsCopy != nil { + as.SubNodeRetryAttempts = *subNodeRetryAttemptsCopy + } + + subNodeSystemFailures := an.GetSubNodeSystemFailures() + if subNodeSystemFailuresCopy := subNodeSystemFailures.DeepCopy(); subNodeSystemFailuresCopy != nil { + as.SubNodeSystemFailures = *subNodeSystemFailuresCopy + } } return as }