diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler.go b/flytepropeller/pkg/controller/nodes/dynamic/handler.go index 0d122bc1d..6f4adb5de 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler.go @@ -156,7 +156,7 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod case v1alpha1.DynamicNodePhaseFailing: err = d.Abort(ctx, nCtx, ds.Reason) if err != nil { - logger.Errorf(ctx, "Failing to abort dynamic workflow") + logger.Errorf(ctx, "Failing to abort dynamic workflow, reason [%s]", err) return trns, err } @@ -216,7 +216,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node func (d dynamicNodeTaskNodeHandler) finalizeParentNode(ctx context.Context, nCtx handler.NodeExecutionContext) error { logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt()) if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil { - logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.") + logger.Errorf(ctx, "Failed to finalize DynamicNodes Parent, reason: [%s]", err) return err } return nil diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 66f83b13b..cdc33ac24 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -500,6 +500,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase) + // If the current Phase is terminal, nothing needs to be done, return. if currentPhase.IsTerminal() { logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase) return nil diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index 42f6556a2..245c9c303 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -6,18 +6,19 @@ import ( "strings" "time" - "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" - v1 "k8s.io/api/core/v1" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flytestdlib/contextutils" stdErrors "github.com/lyft/flytestdlib/errors" + v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" + "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -34,8 +35,9 @@ import ( "github.com/lyft/flyteplugins/go/tasks/errors" - nodeTaskConfig "github.com/lyft/flytepropeller/pkg/controller/nodes/task/config" "sigs.k8s.io/controller-runtime/pkg/handler" + + nodeTaskConfig "github.com/lyft/flytepropeller/pkg/controller/nodes/task/config" ) const finalizer = "flyte/flytek8s" @@ -294,26 +296,8 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { - logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].", + logger.Infof(ctx, "KillTask invoked. NO-OP for K8s plugins [%v].", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) - - o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) - if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) - return nil - } - - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) - - err = e.kubeClient.GetClient().Delete(ctx, o) - if err != nil && !IsK8sObjectNotExists(err) { - logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", - o.GetNamespace(), o.GetName(), err) - return err - } - return nil } @@ -333,17 +317,34 @@ func (e *PluginManager) ClearFinalizers(ctx context.Context, o k8s.Resource) err return nil } +// We first clear the finalizers, if finalizers are enabled. Following this we delete the object. +// This order is important, because clearing finalizer is a mutation, that uses the object resource version number +// to ensure consistent mutations. Deletion is also a mutation, that is independent of the object version. +// So Algorithm: +// - First build the object +// - If finalizers enabled +// - get the object, clear the finalizers and update the object +// - if it fails bubble up an error +// # Subsequent retries will continue clearing the finalizer as that is more important +// - Now issue a background deletion for the object. func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { - // If you change InjectFinalizer on the + logger.Infof(ctx, "Finalize invoked. K8s object deletion attempted [%v].", + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) + // We will always delete the object first + // Build Identity Resource + o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + if err != nil { + // This will recurrent, so we will skip further finalize + logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + return nil + } + + // Add additional object metadata like name etc + AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + + // Remove finalizers, if finalizers are enabled! if config.GetK8sPluginConfig().InjectFinalizer { - o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) - if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) - return nil - } - AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { @@ -364,6 +365,13 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu return err } } + // Delete the object + err = e.kubeClient.GetClient().Delete(ctx, o, client.PropagationPolicy(metav1.DeletePropagationBackground)) + if err != nil && !IsK8sObjectNotExists(err) { + logger.Warningf(ctx, "Failed to delete Resource with name: %v/%v. Error: %v", + o.GetNamespace(), o.GetName(), err) + return err + } return nil } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 6835e53e1..c4b32e3cb 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -14,7 +14,6 @@ import ( "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" "github.com/lyft/flytestdlib/promutils" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -23,6 +22,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" + pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core" pluginsCoreMock "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s" @@ -46,6 +47,7 @@ type extendedFakeClient struct { CreateError error GetError error DeleteError error + UpdateError error } func (e extendedFakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error { @@ -70,6 +72,14 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj runtime.Object, opts return e.Client.Delete(ctx, obj, opts...) } +func (e extendedFakeClient) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOption) error { + if e.UpdateError != nil { + return e.UpdateError + } + + return e.Client.Update(ctx, obj, opts...) +} + type k8sSampleHandler struct { } @@ -360,54 +370,15 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { func TestPluginManager_Abort(t *testing.T) { ctx := context.TODO() - tm := getMockTaskExecutionMetadata() - res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ - Name: tm.GetTaskExecutionID().GetGeneratedName(), - Namespace: tm.GetNamespace(), - }, - } - - t.Run("Abort Pod Exists", func(t *testing.T) { - // common setup code - tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) - fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)} - - // common setup code - mockResourceHandler := &pluginsk8sMock.Plugin{} - mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) - mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) - pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ - ID: "x", - ResourceToWatch: &v1.Pod{}, - Plugin: mockResourceHandler, - }) - assert.NotNil(t, res) - assert.NoError(t, err) - - err = pluginManager.Abort(ctx, tctx) - assert.NoError(t, err) - }) - - t.Run("Abort Pod doesn't exist", func(t *testing.T) { - // common setup code - tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) - fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} - // common setup code - mockResourceHandler := &pluginsk8sMock.Plugin{} - mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) - mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) - pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ - ID: "x", - ResourceToWatch: &v1.Pod{}, - Plugin: mockResourceHandler, - }) - assert.NotNil(t, res) - assert.NoError(t, err) - - err = pluginManager.Abort(ctx, tctx) - assert.NoError(t, err) - }) + tm := &pluginsCoreMock.TaskExecutionMetadata{} + id := &pluginsCoreMock.TaskExecutionID{} + id.OnGetGeneratedName().Return("test") + tm.OnGetTaskExecutionID().Return(id) + tctx := &pluginsCoreMock.TaskExecutionContext{} + tctx.OnTaskExecutionMetadata().Return(tm) + pluginManager := PluginManager{} + err := pluginManager.Abort(ctx, tctx) + assert.NoError(t, err) } func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { @@ -576,6 +547,116 @@ func TestAddObjectMetadata(t *testing.T) { assert.Equal(t, l, o.GetLabels()) } +func TestPluginManager_Finalize(t *testing.T) { + ctx := context.TODO() + tm := getMockTaskExecutionMetadata() + res := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: tm.GetTaskExecutionID().GetGeneratedName(), + Namespace: tm.GetNamespace(), + Finalizers: []string{"f1"}, + }, + } + cfg := config.GetK8sPluginConfig() + cfg.InjectFinalizer = true + assert.NoError(t, config.SetK8sPluginConfig(cfg)) + + t.Run("Clear & Delete Pod Exists", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)} + + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Finalize(ctx, tctx) + assert.NoError(t, err) + err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res) + assert.Error(t, err) + assert.True(t, IsK8sObjectNotExists(err)) + }) + + t.Run("Clear & Delete Pod doesn't exist", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Finalize(ctx, tctx) + assert.NoError(t, err) + }) + + t.Run("Clear & Delete Pod Exists, delete failure", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res), DeleteError: fmt.Errorf("failed")} + + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + res2 := &v1.Pod{} + err = pluginManager.Finalize(ctx, tctx) + assert.Error(t, err) + err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res2) + assert.NoError(t, err) + assert.Len(t, res2.Finalizers, 0) + }) + + t.Run("Clear & Delete Pod Exists, update failure", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res), UpdateError: fmt.Errorf("failed")} + + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + res2 := &v1.Pod{} + err = pluginManager.Finalize(ctx, tctx) + assert.Error(t, err) + err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res2) + assert.NoError(t, err) + assert.Len(t, res2.Finalizers, 1) + }) + +} + func init() { labeled.SetMetricKeys(contextutils.NamespaceKey) }