From 9f4306a9bc429e39d64c73f23f6fdbd38709229a Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Jun 2024 16:48:37 -0700 Subject: [PATCH 1/3] set updated array node phase for task exec event Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 28 +++++++++++-------- .../controller/nodes/array/handler_test.go | 2 ++ 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0f9e95f19b..7068b5a432 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -564,17 +564,7 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // report to admin through a TaskExecutionEvent. if eventRecorder.finalizeRequired(ctx) { // determine task phase from ArrayNodePhase - taskPhase := idlcore.TaskExecution_UNDEFINED - switch currentArrayNodePhase { - case v1alpha1.ArrayNodePhaseNone: - taskPhase = idlcore.TaskExecution_QUEUED - case v1alpha1.ArrayNodePhaseExecuting: - taskPhase = idlcore.TaskExecution_RUNNING - case v1alpha1.ArrayNodePhaseSucceeding: - taskPhase = idlcore.TaskExecution_SUCCEEDED - case v1alpha1.ArrayNodePhaseFailing: - taskPhase = idlcore.TaskExecution_FAILED - } + taskPhase := mapTaskPhase(arrayNodeState.Phase) // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise // increment it if we detect any changes in subNode state. @@ -641,6 +631,22 @@ func New(nodeExecutor interfaces.Node, eventConfig *config.EventConfig, scope pr }, nil } +func mapTaskPhase(arrayNodePhase v1alpha1.ArrayNodePhase) idlcore.TaskExecution_Phase { + taskPhase := idlcore.TaskExecution_UNDEFINED + switch arrayNodePhase { + case v1alpha1.ArrayNodePhaseNone: + taskPhase = idlcore.TaskExecution_QUEUED + case v1alpha1.ArrayNodePhaseExecuting: + taskPhase = idlcore.TaskExecution_RUNNING + case v1alpha1.ArrayNodePhaseSucceeding: + taskPhase = idlcore.TaskExecution_SUCCEEDED + case v1alpha1.ArrayNodePhaseFailing: + taskPhase = idlcore.TaskExecution_FAILED + } + + return taskPhase +} + // buildArrayNodeContext creates a custom environment to execute the ArrayNode subnode. This is uniquely required for // the arrayNodeHandler because we require the same node execution entrypoint (ie. recursiveNodeExecutor.RecursiveNodeHandler) // but need many different execution details, for example setting input values as a singular item rather than a collection, diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index ba0815fee6..90fdac542b 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -442,6 +442,7 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { if len(test.expectedExternalResourcePhases) > 0 { assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) + assert.Equal(t, mapTaskPhase(test.expectedArrayNodePhase), eventRecorder.taskExecutionEvents[0].Phase) externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) @@ -753,6 +754,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { if len(test.expectedExternalResourcePhases) > 0 { assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) + assert.Equal(t, mapTaskPhase(test.expectedArrayNodePhase), eventRecorder.taskExecutionEvents[0].Phase) externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) From 857f0e15daeb594ef2292161f9bfcb753105924e Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Jun 2024 22:54:07 -0700 Subject: [PATCH 2/3] Revert "set updated array node phase for task exec event" This reverts commit 9f4306a9bc429e39d64c73f23f6fdbd38709229a. Signed-off-by: Paul Dittamo --- .../pkg/controller/nodes/array/handler.go | 28 ++++++++----------- .../controller/nodes/array/handler_test.go | 2 -- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 7068b5a432..0f9e95f19b 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -564,7 +564,17 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // report to admin through a TaskExecutionEvent. if eventRecorder.finalizeRequired(ctx) { // determine task phase from ArrayNodePhase - taskPhase := mapTaskPhase(arrayNodeState.Phase) + taskPhase := idlcore.TaskExecution_UNDEFINED + switch currentArrayNodePhase { + case v1alpha1.ArrayNodePhaseNone: + taskPhase = idlcore.TaskExecution_QUEUED + case v1alpha1.ArrayNodePhaseExecuting: + taskPhase = idlcore.TaskExecution_RUNNING + case v1alpha1.ArrayNodePhaseSucceeding: + taskPhase = idlcore.TaskExecution_SUCCEEDED + case v1alpha1.ArrayNodePhaseFailing: + taskPhase = idlcore.TaskExecution_FAILED + } // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise // increment it if we detect any changes in subNode state. @@ -631,22 +641,6 @@ func New(nodeExecutor interfaces.Node, eventConfig *config.EventConfig, scope pr }, nil } -func mapTaskPhase(arrayNodePhase v1alpha1.ArrayNodePhase) idlcore.TaskExecution_Phase { - taskPhase := idlcore.TaskExecution_UNDEFINED - switch arrayNodePhase { - case v1alpha1.ArrayNodePhaseNone: - taskPhase = idlcore.TaskExecution_QUEUED - case v1alpha1.ArrayNodePhaseExecuting: - taskPhase = idlcore.TaskExecution_RUNNING - case v1alpha1.ArrayNodePhaseSucceeding: - taskPhase = idlcore.TaskExecution_SUCCEEDED - case v1alpha1.ArrayNodePhaseFailing: - taskPhase = idlcore.TaskExecution_FAILED - } - - return taskPhase -} - // buildArrayNodeContext creates a custom environment to execute the ArrayNode subnode. This is uniquely required for // the arrayNodeHandler because we require the same node execution entrypoint (ie. recursiveNodeExecutor.RecursiveNodeHandler) // but need many different execution details, for example setting input values as a singular item rather than a collection, diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 90fdac542b..ba0815fee6 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -442,7 +442,6 @@ func TestHandleArrayNodePhaseNone(t *testing.T) { if len(test.expectedExternalResourcePhases) > 0 { assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) - assert.Equal(t, mapTaskPhase(test.expectedArrayNodePhase), eventRecorder.taskExecutionEvents[0].Phase) externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) @@ -754,7 +753,6 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { if len(test.expectedExternalResourcePhases) > 0 { assert.Equal(t, 1, len(eventRecorder.taskExecutionEvents)) - assert.Equal(t, mapTaskPhase(test.expectedArrayNodePhase), eventRecorder.taskExecutionEvents[0].Phase) externalResources := eventRecorder.taskExecutionEvents[0].Metadata.GetExternalResources() assert.Equal(t, len(test.expectedExternalResourcePhases), len(externalResources)) From d47547e64f270e3586ca3a70350c89175762624d Mon Sep 17 00:00:00 2001 From: Paul Dittamo Date: Wed, 5 Jun 2024 22:58:56 -0700 Subject: [PATCH 3/3] reset taskPhaseVersion due to nodephase change after emitting event Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/nodes/array/handler.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 0f9e95f19b..cdf77e1875 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -576,11 +576,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu taskPhase = idlcore.TaskExecution_FAILED } - // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0, otherwise - // increment it if we detect any changes in subNode state. - if currentArrayNodePhase != arrayNodeState.Phase { - arrayNodeState.TaskPhaseVersion = 0 - } else if incrementTaskPhaseVersion { + // increment taskPhaseVersion if we detect any changes in subNode state. + if incrementTaskPhaseVersion { arrayNodeState.TaskPhaseVersion = arrayNodeState.TaskPhaseVersion + 1 } @@ -588,6 +585,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu logger.Errorf(ctx, "ArrayNode event recording failed: [%s]", err.Error()) return handler.UnknownTransition, err } + + // if the ArrayNode phase has changed we need to reset the taskPhaseVersion to 0 + if currentArrayNodePhase != arrayNodeState.Phase { + arrayNodeState.TaskPhaseVersion = 0 + } } // update array node status