From 302678cf91348fa034cb9d736e1a2d04b86b44cc Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Tue, 11 Apr 2023 13:06:04 -0700 Subject: [PATCH 1/2] Check for TerminateExecution error and eat Precondition status Signed-off-by: Haytham Abuelfutuh --- .../nodes/subworkflow/launchplan/admin.go | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 251bd207d..05385a1f7 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -2,14 +2,16 @@ package launchplan import ( "context" + "errors" "fmt" + evtErr "github.com/flyteorg/flytepropeller/events/errors" "time" "github.com/flyteorg/flytestdlib/cache" "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" - "github.com/flyteorg/flytestdlib/errors" + stdErr "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" @@ -61,11 +63,11 @@ func (a *adminLaunchPlanExecutor) handleLaunchError(ctx context.Context, isRecov logger.Errorf(ctx, "Failed to add ExecID [%v] to auto refresh cache", executionID) } - return errors.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name) + return stdErr.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name) case codes.DataLoss, codes.DeadlineExceeded, codes.Internal, codes.Unknown, codes.Canceled: - return errors.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name) + return stdErr.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name) default: - return errors.Wrapf(RemoteErrorUser, err, "failed to launch workflow") + return stdErr.Wrapf(RemoteErrorUser, err, "failed to launch workflow") } } @@ -154,9 +156,9 @@ func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanR lp, err := a.adminClient.GetLaunchPlan(ctx, &getObjectRequest) if err != nil { if status.Code(err) == codes.NotFound { - return nil, errors.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin") + return nil, stdErr.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin") } - return nil, errors.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin") + return nil, stdErr.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin") } return lp, nil @@ -172,7 +174,16 @@ func (a *adminLaunchPlanExecutor) Kill(ctx context.Context, executionID *core.Wo if status.Code(err) == codes.NotFound { return nil } - return errors.Wrapf(RemoteErrorSystem, err, "system error") + + err = evtErr.WrapError(err) + eventErr := &evtErr.EventError{} + if errors.As(err, eventErr) { + if eventErr.Code == evtErr.EventAlreadyInTerminalStateError { + return nil + } + } + + return stdErr.Wrapf(RemoteErrorSystem, err, "system error") } return nil } @@ -207,12 +218,12 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc res, err := a.adminClient.GetExecution(ctx, req) if err != nil { - // TODO: Define which error codes are system errors (and return the error) vs user errors. + // TODO: Define which error codes are system errors (and return the error) vs user stdErr. if status.Code(err) == codes.NotFound { - err = errors.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name) + err = stdErr.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name) } else { - err = errors.Wrapf(RemoteErrorSystem, err, "system error") + err = stdErr.Wrapf(RemoteErrorSystem, err, "system error") } resp = append(resp, cache.ItemSyncResponse{ From dc5764868d282e18087e1d6d4ad7a2f11fcd78ee Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 13 Apr 2023 19:02:25 -0700 Subject: [PATCH 2/2] lint Signed-off-by: Haytham Abuelfutuh --- pkg/controller/nodes/subworkflow/launchplan/admin.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 05385a1f7..0329f3aef 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -4,9 +4,10 @@ import ( "context" "errors" "fmt" - evtErr "github.com/flyteorg/flytepropeller/events/errors" "time" + evtErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytestdlib/cache" "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue"