Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear finalizer on abort for arrays #5221

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,37 +105,65 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi
deleteResource := true
abortOverride, hasAbortOverride := plugin.(k8s.PluginAbortOverride)

resourceToFinalize := o
var behavior k8s.AbortBehavior

if hasAbortOverride {
behavior, err = abortOverride.OnAbort(ctx, stCtx, o)
deleteResource = err == nil && behavior.DeleteResource
if err == nil && behavior.Resource != nil {
resourceToFinalize = behavior.Resource
o = behavior.Resource
}
}

if err != nil {
} else if deleteResource {
err = kubeClient.GetClient().Delete(ctx, resourceToFinalize)
// In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
// clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// after the resource was created, we will not find any finalizers to clear and the object may have already been
// deleted at this point. Therefore, account for these cases and do not consider them errors.
if config.GetK8sPluginConfig().InjectFinalizer {
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
if err := kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if isK8sObjectNotExists(err) {
return nil
}
// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err)
return err
}

// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err = clearFinalizers(ctx, o, kubeClient)
if err != nil {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}

err = kubeClient.GetClient().Delete(ctx, o)

}

} else {
if behavior.Patch != nil && behavior.Update == nil {
err = kubeClient.GetClient().Patch(ctx, resourceToFinalize, behavior.Patch.Patch, behavior.Patch.Options...)
err = kubeClient.GetClient().Patch(ctx, o, behavior.Patch.Patch, behavior.Patch.Options...)
} else if behavior.Patch == nil && behavior.Update != nil {
err = kubeClient.GetClient().Update(ctx, resourceToFinalize, behavior.Update.Options...)
err = kubeClient.GetClient().Update(ctx, o, behavior.Update.Options...)
} else {
err = errors.Errorf(errors.RuntimeFailure, "AbortBehavior for resource %v must specify either a Patch and an Update operation if Delete is set to false. Only one can be supplied.", resourceToFinalize.GetName())
err = errors.Errorf(errors.RuntimeFailure, "AbortBehavior for resource %v must specify either a Patch and an Update operation if Delete is set to false. Only one can be supplied.", o.GetName())
}
if behavior.DeleteOnErr && err != nil {
logger.Warningf(ctx, "Failed to apply AbortBehavior for resource %v with error %v. Will attempt to delete resource.", resourceToFinalize.GetName(), err)
err = kubeClient.GetClient().Delete(ctx, resourceToFinalize)
logger.Warningf(ctx, "Failed to apply AbortBehavior for resource %v with error %v. Will attempt to delete resource.", o.GetName(), err)
err = kubeClient.GetClient().Delete(ctx, o)
}
}

if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err)
logger.Warningf(ctx, "Failed to delete Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}

Expand Down
Loading