From 7866c319d00734f10732fd493d60512500b90317 Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Wed, 21 Aug 2024 09:08:09 -0700 Subject: [PATCH] [Bug] Update resource failures w/ Finalizers set (#423) (#5673) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Overview when [informer cache has stale values](https://github.com/unionai/flyte/blob/1e82352dd95f89630e333fe6105d5fdb5487a24e/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L478), we cannot update the k8s resource when [clearing finalizers](https://github.com/unionai/flyte/blob/1e82352dd95f89630e333fe6105d5fdb5487a24e/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L450) and get `Error: Operation cannot be fulfilled on pods.` The current implementation bubbles up the error resulting in a system retry. By the next loop, the informer cache is up to date and the update is able to be applied. However, in an ArrayNode with many subnodes getting executed in parallel, the execution can easily run out of retries. This update adds a basic retry with exponential backoff to wait for the informer cache to get up to date. ## Test Plan Ran in dogfood-gcp - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4622 + manually updated configmap to enabled finalizers - Run without change (https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/fd16ac81fd7b5480fb6f/nodes) - Run with change (https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/f016a3be7fa304db5a77/nodeId/n0/nodes) confirmed in logs that conflict errors: ``` {"json":{"exec_id":"f016a3be7fa304db5a77","node":"n0/n42","ns":"development","res_ver":"146129599","routine":"worker-66","src":"plugin_manager.go:455","wf":"flytesnacks:development:tests.flytekit.integration.map_task_issue.wf8"},"level":"warning","msg":"Failed to clear finalizers for Resource with name: development/f016a3be7fa304db5a77-n0-0-n42-0. Error: Operation cannot be fulfilled on pods \"f016a3be7fa304db5a77-n0-0-n42-0\": the object has been modified; please apply your changes to the latest version and try again","ts":"2024-08-17T02:02:48Z"} ``` did not bubble up + confirmed finalizers were removed: ``` ➜ ~ k get pods -n development f016a3be7fa304db5a77-n0-0-n42-0 -o json | grep -i final INFO[0000] [0] Couldn't find a config file []. Relying on env vars and pflags. ➜ ~ ``` ## Rollout Plan (if applicable) - revert changes to customer's config that disabled finalizers ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [x] To be upstreamed to OSS ## Issue fixes: https://linear.app/unionai/issue/COR-1558/investigate-why-finalizers-consume-system-retries-in-map-tasks ## Checklist * [ ] Added tests * [x] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation Signed-off-by: Paul Dittamo --- .../pluginmachinery/flytek8s/config/config.go | 8 ++ .../flytek8s/config/k8spluginconfig_flags.go | 2 + .../config/k8spluginconfig_flags_test.go | 28 +++++++ .../nodes/task/k8s/plugin_manager.go | 79 +++++++++++++------ 4 files changed, 92 insertions(+), 25 deletions(-) diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 109ef06ba1..eb19015586 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -64,6 +64,8 @@ var ( DefaultPodTemplateResync: config2.Duration{ Duration: 30 * time.Second, }, + UpdateBaseBackoffDuration: 10, + UpdateBackoffRetries: 5, } // K8sPluginConfigSection provides a singular top level config section for all plugins. @@ -206,6 +208,12 @@ type K8sPluginConfig struct { // SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events). SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."` + + // Initial delay in exponential backoff when updating a resource in milliseconds. + UpdateBaseBackoffDuration int `json:"update-base-backoff-duration" pflag:",Initial delay in exponential backoff when updating a resource in milliseconds."` + + // Number of retries for exponential backoff when updating a resource. + UpdateBackoffRetries int `json:"update-backoff-retries" pflag:",Number of retries for exponential backoff when updating a resource."` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go index 7a3f1c951e..4652d0bfd4 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags.go @@ -67,5 +67,7 @@ func (cfg K8sPluginConfig) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-name"), defaultK8sConfig.DefaultPodTemplateName, "Name of the PodTemplate to use as the base for all k8s pods created by FlytePropeller.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "default-pod-template-resync"), defaultK8sConfig.DefaultPodTemplateResync.String(), "Frequency of resyncing default pod templates") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "send-object-events"), defaultK8sConfig.SendObjectEvents, "If true, will send k8s object events in TaskExecutionEvent updates.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "update-base-backoff-duration"), defaultK8sConfig.UpdateBaseBackoffDuration, "Initial delay in exponential backoff when updating a resource in milliseconds.") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "update-backoff-retries"), defaultK8sConfig.UpdateBackoffRetries, "Number of retries for exponential backoff when updating a resource.") return cmdFlags } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go index 4d5918a3b5..cc46ffa466 100755 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go @@ -337,4 +337,32 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) { } }) }) + t.Run("Test_update-base-backoff-duration", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("update-base-backoff-duration", testValue) + if vInt, err := cmdFlags.GetInt("update-base-backoff-duration"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vInt), &actual.UpdateBaseBackoffDuration) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_update-backoff-retries", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("update-backoff-retries", testValue) + if vInt, err := cmdFlags.GetInt("update-backoff-retries"); err == nil { + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vInt), &actual.UpdateBackoffRetries) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index f9c3806ee6..42d3ad9b85 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" @@ -92,9 +93,11 @@ type PluginManager struct { kubeClient pluginsCore.KubeClient metrics PluginMetrics // Per namespace-resource - backOffController *backoff.Controller - resourceLevelMonitor *ResourceLevelMonitor - eventWatcher EventWatcher + backOffController *backoff.Controller + resourceLevelMonitor *ResourceLevelMonitor + eventWatcher EventWatcher + updateBaseBackoffDuration int + updateBackoffRetries int } func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { @@ -463,25 +466,48 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu } nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} + retryBackoff := wait.Backoff{ + Duration: time.Duration(e.updateBaseBackoffDuration) * time.Millisecond, + Factor: 2.0, + Jitter: 0.1, + Steps: e.updateBackoffRetries, + } + // Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all // objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is // enabled/disabled during object execution. - if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if isK8sObjectNotExists(err) { - return nil + var lastErr error + _ = wait.ExponentialBackoff(retryBackoff, func() (bool, error) { + lastErr = nil + if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { + if isK8sObjectNotExists(err) { + return true, nil + } + lastErr = err + // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a + // Pod does not exist error. This should be retried using the retry policy + logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err) + return true, err } - // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a - // Pod does not exist error. This should be retried using the retry policy - logger.Warningf(ctx, "Failed in finalizing get Resource with name: %v. Error: %v", nsName, err) - return err - } - // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will - // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send - // the same event (idempotent) and then come here again... - err = e.clearFinalizers(ctx, o) - if err != nil { - errs.Append(err) + // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will + // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send + // the same event (idempotent) and then come here again... + if err := e.clearFinalizers(ctx, o); err != nil { + lastErr = err + // retry is if there is a conflict in case the informer cache is out of sync + if k8serrors.IsConflict(err) { + logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v. Retrying..", nsName, err) + return false, nil + } + logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v. Error: %v", nsName, err) + return true, err + } + return true, nil + }) + + if lastErr != nil { + errs.Append(lastErr) } // If we should delete the resource when finalize is called, do a best effort delete. @@ -630,8 +656,9 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } + k8sConfig := config.GetK8sPluginConfig() var eventWatcher EventWatcher - if config.GetK8sPluginConfig().SendObjectEvents { + if k8sConfig.SendObjectEvents { eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) if err != nil { return nil, err @@ -645,13 +672,15 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry rm.RunCollectorOnce(ctx) return &PluginManager{ - id: entry.ID, - plugin: entry.Plugin, - resourceToWatch: entry.ResourceToWatch, - metrics: newPluginMetrics(metricsScope), - kubeClient: kubeClient, - resourceLevelMonitor: rm, - eventWatcher: eventWatcher, + id: entry.ID, + plugin: entry.Plugin, + resourceToWatch: entry.ResourceToWatch, + metrics: newPluginMetrics(metricsScope), + kubeClient: kubeClient, + resourceLevelMonitor: rm, + eventWatcher: eventWatcher, + updateBaseBackoffDuration: k8sConfig.UpdateBaseBackoffDuration, + updateBackoffRetries: k8sConfig.UpdateBackoffRetries, }, nil }