Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Bug; abort fails for a task and running pods are not deleted
Browse files Browse the repository at this point in the history
Abort always fails for a task if task was already in a terminal state - success, failure or retryable fail. This is because the event publish fails.
This fix ensures an event is not published for terminal cases.

 - [x] Bug Fix
 - [ ] Feature
 - [ ] Plugin

 - [x] Code completed
 - [x] Smoke tested
 - [x] Unit tests added
 - [x] Code documentation added
 - [x] Any pending items have an associated Issue

NA

flyteorg/flyte#333

NA
  • Loading branch information
Ketan Umare committed May 29, 2020
1 parent 7f40317 commit 39163c3
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 82 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (d dynamicNodeTaskNodeHandler) Handle(ctx context.Context, nCtx handler.Nod
case v1alpha1.DynamicNodePhaseFailing:
err = d.Abort(ctx, nCtx, ds.Reason)
if err != nil {
logger.Errorf(ctx, "Failing to abort dynamic workflow")
logger.Errorf(ctx, "Failing to abort dynamic workflow, reason [%s]", err)
return trns, err
}

Expand Down Expand Up @@ -216,7 +216,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node
func (d dynamicNodeTaskNodeHandler) finalizeParentNode(ctx context.Context, nCtx handler.NodeExecutionContext) error {
logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt())
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
logger.Errorf(ctx, "Failed to finalize DynamicNodes Parent, reason: [%s]", err)
return err
}
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ func (t Handler) Abort(ctx context.Context, nCtx handler.NodeExecutionContext, r
currentPhase := nCtx.NodeStateReader().GetTaskNodeState().PluginPhase
logger.Debugf(ctx, "Abort invoked with phase [%v]", currentPhase)

// If the current Phase is terminal, nothing needs to be done, return.
if currentPhase.IsTerminal() {
logger.Debugf(ctx, "Returning immediately from Abort since task is already in terminal phase.", currentPhase)
return nil
Expand Down
70 changes: 39 additions & 31 deletions pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"strings"
"time"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"
v1 "k8s.io/api/core/v1"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flytestdlib/contextutils"
stdErrors "github.com/lyft/flytestdlib/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"

"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"

Expand All @@ -34,8 +35,9 @@ import (

"github.com/lyft/flyteplugins/go/tasks/errors"

nodeTaskConfig "github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
"sigs.k8s.io/controller-runtime/pkg/handler"

nodeTaskConfig "github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
)

const finalizer = "flyte/flytek8s"
Expand Down Expand Up @@ -294,26 +296,8 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio
}

func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error {
logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].",
logger.Infof(ctx, "KillTask invoked. NO-OP for K8s plugins [%v].",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())

o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

err = e.kubeClient.GetClient().Delete(ctx, o)
if err != nil && !IsK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}

return nil
}

Expand All @@ -333,17 +317,34 @@ func (e *PluginManager) ClearFinalizers(ctx context.Context, o k8s.Resource) err
return nil
}

// We first clear the finalizers, if finalizers are enabled. Following this we delete the object.
// This order is important, because clearing finalizer is a mutation, that uses the object resource version number
// to ensure consistent mutations. Deletion is also a mutation, that is independent of the object version.
// So Algorithm:
// - First build the object
// - If finalizers enabled
// - get the object, clear the finalizers and update the object
// - if it fails bubble up an error
// # Subsequent retries will continue clearing the finalizer as that is more important
// - Now issue a background deletion for the object.
func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error {
// If you change InjectFinalizer on the
logger.Infof(ctx, "Finalize invoked. K8s object deletion attempted [%v].",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
// We will always delete the object first
// Build Identity Resource
o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

// Add additional object metadata like name etc
AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())

// Remove finalizers, if finalizers are enabled!
if config.GetK8sPluginConfig().InjectFinalizer {
o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata())
if err != nil {
// This will recurrent, so we will skip further finalize
logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err)
return nil
}

AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig())
nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()}
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil {
Expand All @@ -364,6 +365,13 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu
return err
}
}
// Delete the object
err = e.kubeClient.GetClient().Delete(ctx, o, client.PropagationPolicy(metav1.DeletePropagationBackground))
if err != nil && !IsK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to delete Resource with name: %v/%v. Error: %v",
o.GetNamespace(), o.GetName(), err)
return err
}
return nil
}

Expand Down
179 changes: 130 additions & 49 deletions pkg/controller/nodes/task/k8s/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"
"github.com/lyft/flytestdlib/promutils"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -23,6 +22,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff"

pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
pluginsCoreMock "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
Expand All @@ -46,6 +47,7 @@ type extendedFakeClient struct {
CreateError error
GetError error
DeleteError error
UpdateError error
}

func (e extendedFakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error {
Expand All @@ -70,6 +72,14 @@ func (e extendedFakeClient) Delete(ctx context.Context, obj runtime.Object, opts
return e.Client.Delete(ctx, obj, opts...)
}

func (e extendedFakeClient) Update(ctx context.Context, obj runtime.Object, opts ...client.UpdateOption) error {
if e.UpdateError != nil {
return e.UpdateError
}

return e.Client.Update(ctx, obj, opts...)
}

type k8sSampleHandler struct {
}

Expand Down Expand Up @@ -360,54 +370,15 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) {

func TestPluginManager_Abort(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
res := &v1.Pod{
ObjectMeta: v12.ObjectMeta{
Name: tm.GetTaskExecutionID().GetGeneratedName(),
Namespace: tm.GetNamespace(),
},
}

t.Run("Abort Pod Exists", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)}

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})

t.Run("Abort Pod doesn't exist", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
})
tm := &pluginsCoreMock.TaskExecutionMetadata{}
id := &pluginsCoreMock.TaskExecutionID{}
id.OnGetGeneratedName().Return("test")
tm.OnGetTaskExecutionID().Return(id)
tctx := &pluginsCoreMock.TaskExecutionContext{}
tctx.OnTaskExecutionMetadata().Return(tm)
pluginManager := PluginManager{}
err := pluginManager.Abort(ctx, tctx)
assert.NoError(t, err)
}

func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) {
Expand Down Expand Up @@ -576,6 +547,116 @@ func TestAddObjectMetadata(t *testing.T) {
assert.Equal(t, l, o.GetLabels())
}

func TestPluginManager_Finalize(t *testing.T) {
ctx := context.TODO()
tm := getMockTaskExecutionMetadata()
res := &v1.Pod{
ObjectMeta: v12.ObjectMeta{
Name: tm.GetTaskExecutionID().GetGeneratedName(),
Namespace: tm.GetNamespace(),
Finalizers: []string{"f1"},
},
}
cfg := config.GetK8sPluginConfig()
cfg.InjectFinalizer = true
assert.NoError(t, config.SetK8sPluginConfig(cfg))

t.Run("Clear & Delete Pod Exists", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)}

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Finalize(ctx, tctx)
assert.NoError(t, err)
err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res)
assert.Error(t, err)
assert.True(t, IsK8sObjectNotExists(err))
})

t.Run("Clear & Delete Pod doesn't exist", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)}
// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

err = pluginManager.Finalize(ctx, tctx)
assert.NoError(t, err)
})

t.Run("Clear & Delete Pod Exists, delete failure", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res), DeleteError: fmt.Errorf("failed")}

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

res2 := &v1.Pod{}
err = pluginManager.Finalize(ctx, tctx)
assert.Error(t, err)
err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res2)
assert.NoError(t, err)
assert.Len(t, res2.Finalizers, 0)
})

t.Run("Clear & Delete Pod Exists, update failure", func(t *testing.T) {
// common setup code
tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted)
fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res), UpdateError: fmt.Errorf("failed")}

// common setup code
mockResourceHandler := &pluginsk8sMock.Plugin{}
mockResourceHandler.OnBuildIdentityResourceMatch(mock.Anything, tctx.TaskExecutionMetadata()).Return(&v1.Pod{}, nil)
mockResourceHandler.OnGetTaskPhaseMatch(mock.Anything, mock.Anything, mock.Anything).Return(pluginsCore.PhaseInfo{}, nil)
pluginManager, err := NewPluginManager(ctx, dummySetupContext(fc), k8s.PluginEntry{
ID: "x",
ResourceToWatch: &v1.Pod{},
Plugin: mockResourceHandler,
})
assert.NotNil(t, res)
assert.NoError(t, err)

res2 := &v1.Pod{}
err = pluginManager.Finalize(ctx, tctx)
assert.Error(t, err)
err = fc.Get(ctx, k8stypes.NamespacedName{Namespace: res.Namespace, Name: res.Name}, res2)
assert.NoError(t, err)
assert.Len(t, res2.Finalizers, 1)
})

}

func init() {
labeled.SetMetricKeys(contextutils.NamespaceKey)
}

0 comments on commit 39163c3

Please sign in to comment.