Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Delete K8s Resources on abort for K8s plugins #122

Merged
merged 1 commit into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,24 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio
}

func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error {
logger.Infof(ctx, "KillTask invoked for %v, nothing to be done.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())

o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}

return nil
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

"k8s.io/client-go/kubernetes/scheme"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -43,6 +45,7 @@ type extendedFakeClient struct {
client.Client
CreateError error
GetError error
DeleteError error
}

func (e extendedFakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error {
Expand All @@ -59,6 +62,14 @@ func (e extendedFakeClient) Get(ctx context.Context, key client.ObjectKey, obj r
return e.Client.Get(ctx, key, obj)
}

func (e extendedFakeClient) Delete(ctx context.Context, obj runtime.Object, opts ...client.DeleteOption) error {
if e.DeleteError != nil {
return e.DeleteError
}

return e.Client.Delete(ctx, obj, opts...)
}

type k8sSampleHandler struct {
}

Expand Down Expand Up @@ -347,6 +358,57 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
})
}

func TestPluginManager_Abort(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
res := &v1.Pod{
ObjectMeta: v12.ObjectMeta{
Name: tm.GetTaskExecutionID().GetGeneratedName(),
Namespace: tm.GetNamespace(),
},
}

t.Run("Abort Pod Exists", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})

t.Run("Abort Pod doesn't exist", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})
}

func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
Expand Down