Skip to content

Commit

Permalink
Node executor abort to call finalize even on error (flyteorg#51)
Browse files Browse the repository at this point in the history
In cases where the abort call fails, we should still call finalize as this is the intended behavior of the finalize construct.
  • Loading branch information
wild-endeavor authored Dec 31, 2019
1 parent 63983bd commit 1933683
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
7 changes: 1 addition & 6 deletions flytepropeller/boilerplate/lyft/golang_test_targets/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,7 @@ DEP_SHA=1f7c19e5f52f49ffb9f956f64c010be14683468b
.PHONY: lint
lint: #lints the package for common code smells
which golangci-lint || GO111MODULE=on go install github.com/golangci/golangci-lint/cmd/golangci-lint
# Calling lint twice here is a hack. The first call seem to fail when internally calling `go list...`
# However, that call seem to have some effects (e.g. https://github.com/golang/go/issues/29452) which, for some
# reason, allows the subsequent calls to succeed.
# TODO: Evaluate whether this is still a problem after moving admin dependency system to go modules..
# GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --exclude deprecated -v || true
GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --deadline=5m --exclude deprecated -v
GO111MODULE=off golangci-lint run --deadline=5m --exclude deprecated -v

# If code is failing goimports linter, this will fix.
# skips 'vendor'
Expand Down
6 changes: 5 additions & 1 deletion flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,14 @@ func (c *nodeExecutor) execute(ctx context.Context, h handler.Node, nCtx *execCo
func (c *nodeExecutor) abort(ctx context.Context, h handler.Node, nCtx handler.NodeExecutionContext, reason string) error {
logger.Debugf(ctx, "Calling aborting & finalize")
if err := h.Abort(ctx, nCtx, reason); err != nil {
finalizeErr := h.Finalize(ctx, nCtx)
if finalizeErr != nil {
return errors.ErrorCollection{Errors: []error{err, finalizeErr}}
}
return err
}
return h.Finalize(ctx, nCtx)

return h.Finalize(ctx, nCtx)
}

func (c *nodeExecutor) finalize(ctx context.Context, h handler.Node, nCtx handler.NodeExecutionContext) error {
Expand Down
48 changes: 48 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,3 +1221,51 @@ func Test_nodeExecutor_timeout(t *testing.T) {
})
}
}

func Test_nodeExecutor_abort(t *testing.T) {
ctx := context.Background()
exec := nodeExecutor{}
nCtx := &execContext{}

t.Run("abort error calls finalize", func(t *testing.T) {
h := &nodeHandlerMocks.Node{}
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error"))
h.OnFinalizeRequired().Return(true)
var called bool
h.OnFinalizeMatch(mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
called = true
}).Return(nil)

err := exec.abort(ctx, h, nCtx, "testing")
assert.Equal(t, "test error", err.Error())
assert.True(t, called)
})

t.Run("abort error calls finalize with error", func(t *testing.T) {
h := &nodeHandlerMocks.Node{}
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(errors.New("test error"))
h.OnFinalizeRequired().Return(true)
var called bool
h.OnFinalizeMatch(mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
called = true
}).Return(errors.New("finalize error"))

err := exec.abort(ctx, h, nCtx, "testing")
assert.Equal(t, "0: test error\r\n1: finalize error\r\n", err.Error())
assert.True(t, called)
})

t.Run("abort calls finalize when no errors", func(t *testing.T) {
h := &nodeHandlerMocks.Node{}
h.OnAbortMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil)
h.OnFinalizeRequired().Return(true)
var called bool
h.OnFinalizeMatch(mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
called = true
}).Return(nil)

err := exec.abort(ctx, h, nCtx, "testing")
assert.NoError(t, err)
assert.True(t, called)
})
}
2 changes: 1 addition & 1 deletion flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (t Handler) Finalize(ctx context.Context, nCtx handler.NodeExecutionContext
}

func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, scope promutils.Scope) (*Handler, error) {
// TODO NewShould take apointer
// TODO New should take a pointer
async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog"))
if err != nil {
return nil, err
Expand Down

0 comments on commit 1933683

Please sign in to comment.