Skip to content

Commit

Permalink
Delete CRDs on abort for K8s plugins (flyteorg#122)
Browse files Browse the repository at this point in the history
  • Loading branch information
EngHabu authored Apr 22, 2020
1 parent 56cd3ca commit 0da8fa2
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
19 changes: 18 additions & 1 deletion flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,24 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio
}

func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error {
logger.Infof(ctx, "KillTask invoked for %v, nothing to be done.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())

o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

"k8s.io/client-go/kubernetes/scheme"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"
Expand Down Expand Up @@ -43,6 +45,7 @@ type extendedFakeClient struct {
client.Client
CreateError error
GetError error
DeleteError error
}

func (e extendedFakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error {
Expand All @@ -59,6 +62,14 @@ func (e extendedFakeClient) Get(ctx context.Context, key client.ObjectKey, obj r
return e.Client.Get(ctx, key, obj)
}

func (e extendedFakeClient) Delete(ctx context.Context, obj runtime.Object, opts ...client.DeleteOption) error {
if e.DeleteError != nil {
return e.DeleteError
}

return e.Client.Delete(ctx, obj, opts...)
}

type k8sSampleHandler struct {
}

Expand Down Expand Up @@ -347,6 +358,57 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {
})
}

func TestPluginManager_Abort(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
res := &v1.Pod{
ObjectMeta: v12.ObjectMeta{
Name: tm.GetTaskExecutionID().GetGeneratedName(),
Namespace: tm.GetNamespace(),
},
}

t.Run("Abort Pod Exists", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})

t.Run("Abort Pod doesn't exist", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})
}

func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
Expand Down

0 comments on commit 0da8fa2

Please sign in to comment.