From ba197a02c98e474b8ad1dba8862ec04810809aa2 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Mon, 6 Jun 2022 17:33:28 -0500 Subject: [PATCH] Graceful failure on ExecutionNotFound error (#439) * updated ExecutionNotFound to check failed attempts rather than workflow state Signed-off-by: Daniel Rammer * added unit test to allow continue until maxRetries Signed-off-by: Daniel Rammer * added dropping ExecutionNotFound error when aborting workflow Signed-off-by: Daniel Rammer * fixed unit tests Signed-off-by: Daniel Rammer --- pkg/controller/handler_test.go | 1 - pkg/controller/nodes/executor.go | 2 +- pkg/controller/nodes/task/handler.go | 2 +- pkg/controller/workflow/executor.go | 2 +- 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/controller/handler_test.go b/pkg/controller/handler_test.go index ba8c5645e..e957aeb51 100644 --- a/pkg/controller/handler_test.go +++ b/pkg/controller/handler_test.go @@ -462,7 +462,6 @@ func TestPropeller_Handle(t *testing.T) { assert.Equal(t, 0, len(r.Finalizers)) assert.True(t, HasCompletedLabel(r)) }) - t.Run("failOnExecutionNotFoundError", func(t *testing.T) { assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index c4ca81e77..12b7eb5de 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -1057,7 +1057,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E }, ProducerId: c.clusterID, }) - if err != nil && !eventsErr.IsEventIncompatibleClusterError(err) { + if err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { if errors2.IsCausedBy(err, errors.IllegalStateError) { logger.Debugf(ctx, "Failed to record abort event due to illegal state transition. Ignoring the error. Error: %v", err) } else { diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index ebe9cebd5..9c9002581 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -761,7 +761,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r Code: "Task Aborted", Message: reason, }}, - }, t.eventConfig); err != nil && !eventsErr.IsEventIncompatibleClusterError(err) { + }, t.eventConfig); err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) { // If a prior workflow/node/task execution event has failed because of an invalid cluster error, don't stall the abort // at this point in the clean-up. logger.Errorf(ctx, "failed to send event to Admin. error: %s", err.Error()) diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index e3d8e8131..92a870c1c 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -342,7 +342,7 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W return nil } if (wfEvent.Phase == core.WorkflowExecution_FAILING || wfEvent.Phase == core.WorkflowExecution_FAILED) && - eventsErr.IsEventIncompatibleClusterError(recordingErr) { + (eventsErr.IsNotFound(recordingErr) || eventsErr.IsEventIncompatibleClusterError(recordingErr)) { // Don't stall the workflow transition to terminated (so that resources can be cleaned up) since these events // are being discarded by the back-end anyways. logger.Infof(ctx, "Failed to record %s workflowEvent, error [%s]. Ignoring this error!", wfEvent.Phase.String(), recordingErr.Error())