Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Check for TerminateExecution error and eat Precondition status #553

Merged
merged 4 commits into from
Apr 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions pkg/controller/nodes/subworkflow/launchplan/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package launchplan

import (
"context"
"errors"
"fmt"
"time"

evtErr "github.com/flyteorg/flytepropeller/events/errors"

"github.com/flyteorg/flytestdlib/cache"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"

"github.com/flyteorg/flytestdlib/errors"
stdErr "github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flytestdlib/logger"

Expand Down Expand Up @@ -61,11 +64,11 @@ func (a *adminLaunchPlanExecutor) handleLaunchError(ctx context.Context, isRecov
logger.Errorf(ctx, "Failed to add ExecID [%v] to auto refresh cache", executionID)
}

return errors.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name)
return stdErr.Wrapf(RemoteErrorAlreadyExists, err, "ExecID %s already exists", executionID.Name)
case codes.DataLoss, codes.DeadlineExceeded, codes.Internal, codes.Unknown, codes.Canceled:
return errors.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name)
return stdErr.Wrapf(RemoteErrorSystem, err, "failed to launch workflow [%s], system error", launchPlanRef.Name)
default:
return errors.Wrapf(RemoteErrorUser, err, "failed to launch workflow")
return stdErr.Wrapf(RemoteErrorUser, err, "failed to launch workflow")
}
}

Expand Down Expand Up @@ -154,9 +157,9 @@ func (a *adminLaunchPlanExecutor) GetLaunchPlan(ctx context.Context, launchPlanR
lp, err := a.adminClient.GetLaunchPlan(ctx, &getObjectRequest)
if err != nil {
if status.Code(err) == codes.NotFound {
return nil, errors.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin")
return nil, stdErr.Wrapf(RemoteErrorNotFound, err, "No launch plan retrieved from Admin")
}
return nil, errors.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin")
return nil, stdErr.Wrapf(RemoteErrorSystem, err, "Could not fetch launch plan definition from Admin")
}

return lp, nil
Expand All @@ -172,7 +175,16 @@ func (a *adminLaunchPlanExecutor) Kill(ctx context.Context, executionID *core.Wo
if status.Code(err) == codes.NotFound {
return nil
}
return errors.Wrapf(RemoteErrorSystem, err, "system error")

err = evtErr.WrapError(err)
eventErr := &evtErr.EventError{}
if errors.As(err, eventErr) {
if eventErr.Code == evtErr.EventAlreadyInTerminalStateError {
return nil
}
}

return stdErr.Wrapf(RemoteErrorSystem, err, "system error")
}
return nil
}
Expand Down Expand Up @@ -207,12 +219,12 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc

res, err := a.adminClient.GetExecution(ctx, req)
if err != nil {
// TODO: Define which error codes are system errors (and return the error) vs user errors.
// TODO: Define which error codes are system errors (and return the error) vs user stdErr.

if status.Code(err) == codes.NotFound {
err = errors.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name)
err = stdErr.Wrapf(RemoteErrorNotFound, err, "execID [%s] not found on remote", exec.WorkflowExecutionIdentifier.Name)
} else {
err = errors.Wrapf(RemoteErrorSystem, err, "system error")
err = stdErr.Wrapf(RemoteErrorSystem, err, "system error")
}

resp = append(resp, cache.ItemSyncResponse{
Expand Down