From 0da8fa20233d5e5d807a46248d85178cae86f362 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Wed, 22 Apr 2020 15:16:22 -0700 Subject: [PATCH] Delete CRDs on abort for K8s plugins (#122) --- .../nodes/task/k8s/plugin_manager.go | 19 +++++- .../nodes/task/k8s/plugin_manager_test.go | 62 +++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index a174530ae6..998bbc42b0 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -294,7 +294,24 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { - logger.Infof(ctx, "KillTask invoked for %v, nothing to be done.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) + logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].", + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) + + o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + if err != nil { + // This will recurrent, so we will skip further finalize + logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + return nil + } + + err = e.kubeClient.GetClient().Delete(ctx, o) + if err != nil && !IsK8sObjectNotExists(err) { + logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", + o.GetNamespace(), o.GetName(), err) + return err + } + return nil } diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 2574390719..b1e0d4cbcd 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -6,6 +6,8 @@ import ( "fmt" "testing" + "k8s.io/client-go/kubernetes/scheme" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s" "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils/labeled" @@ -43,6 +45,7 @@ type extendedFakeClient struct { client.Client CreateError error GetError error + DeleteError error } func (e extendedFakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error { @@ -59,6 +62,14 @@ func (e extendedFakeClient) Get(ctx context.Context, key client.ObjectKey, obj r return e.Client.Get(ctx, key, obj) } +func (e extendedFakeClient) Delete(ctx context.Context, obj runtime.Object, opts ...client.DeleteOption) error { + if e.DeleteError != nil { + return e.DeleteError + } + + return e.Client.Delete(ctx, obj, opts...) +} + type k8sSampleHandler struct { } @@ -347,6 +358,57 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { }) } +func TestPluginManager_Abort(t *testing.T) { + ctx := context.TODO() + tm := getMockTaskExecutionMetadata() + res := &v1.Pod{ + ObjectMeta: v12.ObjectMeta{ + Name: tm.GetTaskExecutionID().GetGeneratedName(), + Namespace: tm.GetNamespace(), + }, + } + + t.Run("Abort Pod Exists", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)} + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Abort(ctx, tctx) + assert.NoError(t, err) + }) + + t.Run("Abort Pod doesn't exist", func(t *testing.T) { + // common setup code + tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) + fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} + // common setup code + mockResourceHandler := &pluginsk8sMock.Plugin{} + mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil) + mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil) + pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{ + ID: "x", + ResourceToWatch: &v1.Pod{}, + Plugin: mockResourceHandler, + }) + assert.NotNil(t, res) + assert.NoError(t, err) + + err = pluginManager.Abort(ctx, tctx) + assert.NoError(t, err) + }) +} + func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata()