Skip to content

Commit

Permalink
[Bug] Update resource failures w/ Finalizers set (#423) (#5673)
Browse files Browse the repository at this point in the history
## Overview
when [informer cache has stale values](https://github.com/unionai/flyte/blob/1e82352dd95f89630e333fe6105d5fdb5487a24e/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L478), we cannot update the k8s resource when [clearing finalizers](https://github.com/unionai/flyte/blob/1e82352dd95f89630e333fe6105d5fdb5487a24e/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L450) and get `Error: Operation cannot be fulfilled on pods.` The current implementation bubbles up the error resulting in a system retry. By the next loop, the informer cache is up to date and the update is able to be applied. However, in an ArrayNode with many subnodes getting executed in parallel, the execution can easily run out of retries.

This update adds a basic retry with exponential backoff to wait for the informer cache to get up to date.

## Test Plan
Ran in dogfood-gcp
- https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4622 + manually updated configmap to enabled finalizers
- Run without change (https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/fd16ac81fd7b5480fb6f/nodes)
- Run with change (https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/f016a3be7fa304db5a77/nodeId/n0/nodes)
confirmed in logs that conflict errors:
```
{"json":{"exec_id":"f016a3be7fa304db5a77","node":"n0/n42","ns":"development","res_ver":"146129599","routine":"worker-66","src":"plugin_manager.go:455","wf":"flytesnacks:development:tests.flytekit.integration.map_task_issue.wf8"},"level":"warning","msg":"Failed to clear finalizers for Resource with name: development/f016a3be7fa304db5a77-n0-0-n42-0. Error: Operation cannot be fulfilled on pods \"f016a3be7fa304db5a77-n0-0-n42-0\": the object has been modified; please apply your changes to the latest version and try again","ts":"2024-08-17T02:02:48Z"}

```
did not bubble up + confirmed finalizers were removed:

```
➜  ~ k get pods -n development f016a3be7fa304db5a77-n0-0-n42-0 -o json | grep -i final
INFO[0000] [0] Couldn't find a config file []. Relying on env vars and pflags.
➜  ~
```

## Rollout Plan (if applicable)
- revert changes to customer's config that disabled finalizers

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

## Issue
fixes: https://linear.app/unionai/issue/COR-1558/investigate-why-finalizers-consume-system-retries-in-map-tasks

## Checklist
* [ ] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
pvditt authored and pmahindrakar-oss committed Sep 9, 2024
1 parent 3f98125 commit ace0bf1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
DefaultPodTemplateResync: config2.Duration{
Duration: 30 * time.Second,
},
UpdateBaseBackoffDuration: 10,
UpdateBackoffRetries: 5,
}

// K8sPluginConfigSection provides a singular top level config section for all plugins.
Expand Down Expand Up @@ -206,6 +208,12 @@ type K8sPluginConfig struct {

// SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events).
SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."`

// Initial delay in exponential backoff when updating a resource in milliseconds.
UpdateBaseBackoffDuration int `json:"update-base-backoff-duration" pflag:",Initial delay in exponential backoff when updating a resource in milliseconds."`

// Number of retries for exponential backoff when updating a resource.
UpdateBackoffRetries int `json:"update-backoff-retries" pflag:",Number of retries for exponential backoff when updating a resource."`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 54 additions & 25 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -92,9 +93,11 @@ type PluginManager struct {
kubeClient pluginsCore.KubeClient
metrics PluginMetrics
// Per namespace-resource
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
eventWatcher EventWatcher
backOffController *backoff.Controller
resourceLevelMonitor *ResourceLevelMonitor
eventWatcher EventWatcher
updateBaseBackoffDuration int
updateBackoffRetries int
}

func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) {
Expand Down Expand Up @@ -463,25 +466,48 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
}
nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}

retryBackoff := wait.Backoff{
Duration: time.Duration(e.updateBaseBackoffDuration) * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
Steps: e.updateBackoffRetries,
}

// Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all
// objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is
// enabled/disabled during object execution.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if isK8sObjectNotExists(err) {
return nil
var lastErr error
_ = wait.ExponentialBackoff(retryBackoff, func() (bool, error) {
lastErr = nil
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
if isK8sObjectNotExists(err) {
return true, nil
}
lastErr = err
// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err)
return true, err
}
// This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a
// Pod does not exist error. This should be retried using the retry policy
logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err)
return err
}

// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err = e.clearFinalizers(ctx, o)
if err != nil {
errs.Append(err)
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
if err := e.clearFinalizers(ctx, o); err != nil {
lastErr = err
// retry is if there is a conflict in case the informer cache is out of sync
if k8serrors.IsConflict(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v. Retrying..", nsName, err)
return false, nil
}
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v", nsName, err)
return true, err
}
return true, nil
})

if lastErr != nil {
errs.Append(lastErr)
}

// If we should delete the resource when finalize is called, do a best effort delete.
Expand Down Expand Up @@ -630,8 +656,9 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
return nil, err
}

k8sConfig := config.GetK8sPluginConfig()
var eventWatcher EventWatcher
if config.GetK8sPluginConfig().SendObjectEvents {
if k8sConfig.SendObjectEvents {
eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset)
if err != nil {
return nil, err
Expand All @@ -645,13 +672,15 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry
rm.RunCollectorOnce(ctx)

return &PluginManager{
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
eventWatcher: eventWatcher,
id: entry.ID,
plugin: entry.Plugin,
resourceToWatch: entry.ResourceToWatch,
metrics: newPluginMetrics(metricsScope),
kubeClient: kubeClient,
resourceLevelMonitor: rm,
eventWatcher: eventWatcher,
updateBaseBackoffDuration: k8sConfig.UpdateBaseBackoffDuration,
updateBackoffRetries: k8sConfig.UpdateBackoffRetries,
}, nil
}

Expand Down

0 comments on commit ace0bf1

Please sign in to comment.