diff --git a/pkg/controller/nodes/subworkflow/launchplan/admin.go b/pkg/controller/nodes/subworkflow/launchplan/admin.go index 251bd207d..0329f3aef 100644 --- a/pkg/controller/nodes/subworkflow/launchplan/admin.go +++ b/pkg/controller/nodes/subworkflow/launchplan/admin.go @@ -2,14 +2,17 @@ package launchplan import ( "context" + "errors" "fmt" "time" + evtErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytestdlib/cache" "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" - "github.com/flyteorg/flytestdlib/errors" + stdErr "github.com/flyteorg/flytestdlib/errors" "github.com/flyteorg/flytestdlib/logger" @@ -61,11 +64,11 @@ func (a *adminLaunchPlanExecutor) handleLaunchError(ctx context.Context, isRecov logger.Errorf(ctx, "Failed to add ExecID [%v] to auto refresh cache", executionID) } - return errors.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name) + return stdErr.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name) case codes.DataLoss, codes.DeadlineExceeded, codes.Internal, codes.Unknown, codes.Canceled: - return errors.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name) + return stdErr.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name) default: - return errors.Wrapf(RemoteErrorUser, err, "failed to launch workflow") + return stdErr.Wrapf(RemoteErrorUser, err, "failed to launch workflow") } } @@ -154,9 +157,9 @@ func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanR lp, err := a.adminClient.GetLaunchPlan(ctx, &getObjectRequest) if err != nil { if status.Code(err) == codes.NotFound { - return nil, errors.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin") + return nil, stdErr.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin") } - return nil, errors.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin") + return nil, stdErr.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin") } return lp, nil @@ -172,7 +175,16 @@ func (a *adminLaunchPlanExecutor) Kill(ctx context.Context, executionID *core.Wo if status.Code(err) == codes.NotFound { return nil } - return errors.Wrapf(RemoteErrorSystem, err, "system error") + + err = evtErr.WrapError(err) + eventErr := &evtErr.EventError{} + if errors.As(err, eventErr) { + if eventErr.Code == evtErr.EventAlreadyInTerminalStateError { + return nil + } + } + + return stdErr.Wrapf(RemoteErrorSystem, err, "system error") } return nil } @@ -207,12 +219,12 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc res, err := a.adminClient.GetExecution(ctx, req) if err != nil { - // TODO: Define which error codes are system errors (and return the error) vs user errors. + // TODO: Define which error codes are system errors (and return the error) vs user stdErr. if status.Code(err) == codes.NotFound { - err = errors.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name) + err = stdErr.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name) } else { - err = errors.Wrapf(RemoteErrorSystem, err, "system error") + err = stdErr.Wrapf(RemoteErrorSystem, err, "system error") } resp = append(resp, cache.ItemSyncResponse{