Skip to content

Commit

Permalink
Graceful failure on ExecutionNotFound error (flyteorg#439)
Browse files Browse the repository at this point in the history
* updated ExecutionNotFound to check failed attempts rather than workflow state

Signed-off-by: Daniel Rammer <[email protected]>

* added unit test to allow continue until maxRetries

Signed-off-by: Daniel Rammer <[email protected]>

* added dropping ExecutionNotFound error when aborting workflow

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jun 6, 2022
1 parent df5c201 commit ba197a0
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 4 deletions.
1 change: 0 additions & 1 deletion pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,6 @@ func TestPropeller_Handle(t *testing.T) {
assert.Equal(t, 0, len(r.Finalizers))
assert.True(t, HasCompletedLabel(r))
})

t.Run("failOnExecutionNotFoundError", func(t *testing.T) {
assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{
ObjectMeta: v1.ObjectMeta{
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E
},
ProducerId: c.clusterID,
})
if err != nil && !eventsErr.IsEventIncompatibleClusterError(err) {
if err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) {
if errors2.IsCausedBy(err, errors.IllegalStateError) {
logger.Debugf(ctx, "Failed to record abort event due to illegal state transition. Ignoring the error. Error: %v", err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r
Code: "Task Aborted",
Message: reason,
}},
}, t.eventConfig); err != nil && !eventsErr.IsEventIncompatibleClusterError(err) {
}, t.eventConfig); err != nil && !eventsErr.IsNotFound(err) && !eventsErr.IsEventIncompatibleClusterError(err) {
// If a prior workflow/node/task execution event has failed because of an invalid cluster error, don't stall the abort
// at this point in the clean-up.
logger.Errorf(ctx, "failed to send event to Admin. error: %s", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (c *workflowExecutor) TransitionToPhase(ctx context.Context, execID *core.W
return nil
}
if (wfEvent.Phase == core.WorkflowExecution_FAILING || wfEvent.Phase == core.WorkflowExecution_FAILED) &&
eventsErr.IsEventIncompatibleClusterError(recordingErr) {
(eventsErr.IsNotFound(recordingErr) || eventsErr.IsEventIncompatibleClusterError(recordingErr)) {
// Don't stall the workflow transition to terminated (so that resources can be cleaned up) since these events
// are being discarded by the back-end anyways.
logger.Infof(ctx, "Failed to record %s workflowEvent, error [%s]. Ignoring this error!", wfEvent.Phase.String(), recordingErr.Error())
Expand Down

0 comments on commit ba197a0

Please sign in to comment.