Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

BugFix: SubWorkflow and dynamic node handling failure #84

Merged
merged 8 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
configSection = config.MustRegisterSection(configSectionKey, defaultConfig)

defaultConfig = &Config{
MaxWorkflowRetries: 5,
MaxDatasetSizeBytes: 10 * 1024 * 1024,
Queue: CompositeQueueConfig{
Type: CompositeQueueSimple,
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node
case v1alpha1.DynamicNodePhaseFailing:
fallthrough
case v1alpha1.DynamicNodePhaseExecuting:
logger.Infof(ctx, "Aborting dynamic workflow.")
logger.Infof(ctx, "Aborting dynamic workflow at RetryAttempt [%d]", nCtx.CurrentAttempt())
dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
if err != nil {
return err
Expand All @@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) Abort(ctx context.Context, nCtx handler.Node

return d.nodeExecutor.AbortHandler(ctx, dynamicWF, dynamicWF.StartNode(), reason)
default:
logger.Infof(ctx, "Aborting regular node.")
logger.Infof(ctx, "Aborting regular node RetryAttempt [%d]", nCtx.CurrentAttempt())
// The parent node has not yet completed, so we will abort the parent node
return d.TaskNodeHandler.Abort(ctx, nCtx, reason)
}
Expand All @@ -195,7 +195,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N

ds := nCtx.NodeStateReader().GetDynamicNodeState()
if ds.Phase == v1alpha1.DynamicNodePhaseFailing || ds.Phase == v1alpha1.DynamicNodePhaseExecuting {
logger.Infof(ctx, "Finalizing dynamic workflow")
logger.Infof(ctx, "Finalizing dynamic workflow RetryAttempt [%d]", nCtx.CurrentAttempt())
dynamicWF, isDynamic, err := d.buildContextualDynamicWorkflow(ctx, nCtx)
if err != nil {
errs = append(errs, err)
Expand All @@ -212,7 +212,7 @@ func (d dynamicNodeTaskNodeHandler) Finalize(ctx context.Context, nCtx handler.N
// We should always finalize the parent node success or failure.
// If we use the phase to decide when to finalize in the case where Dynamic node is in phase Executiing
// (i.e. child nodes are now being executed) and Finalize is invoked, we will never invoke the finalizer for the parent.
logger.Infof(ctx, "Finalizing Parent node")
logger.Infof(ctx, "Finalizing Parent node RetryAttempt [%d]", nCtx.CurrentAttempt())
if err := d.TaskNodeHandler.Finalize(ctx, nCtx); err != nil {
logger.Errorf(ctx, "Failed to finalize Dynamic Nodes Parent.")
errs = append(errs, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/dynamic/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func TestDynamicNodeTaskNodeHandler_Finalize(t *testing.T) {
sr := &nodeMocks.NodeStateReader{}
sr.OnGetDynamicNodeState().Return(s)
nCtx.OnNodeStateReader().Return(sr)
nCtx.OnCurrentAttempt().Return(0)

h := &mocks.TaskNodeHandler{}
h.OnFinalize(ctx, nCtx).Return(nil)
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
return executors.NodeStatusUndefined, err
}

// NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state
// Attempt is used throughout the system to determine the idempotent resource version.
nodeStatus.IncrementAttempts()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add docs for why we should do it here and not after the handle() returns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying")
// We are going to retry in the next round, so we should clear all current state
nodeStatus.ClearSubNodeStatus()
Expand All @@ -399,7 +402,6 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork
}

if p.GetPhase() == handler.EPhaseRetryableFailure {
nodeStatus.IncrementAttempts()
if p.GetErr() != nil && p.GetErr().GetKind() == core.ExecutionError_SYSTEM {
nodeStatus.IncrementSystemFailures()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,9 +739,9 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
},
false, true, core.NodeExecution_FAILED, 0},

{"(retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) {
{"retryablefailure->running", v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseRunning, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.UnknownTransition, fmt.Errorf("should not be invoked")
}, false, false, core.NodeExecution_RUNNING, 0},
}, false, false, core.NodeExecution_RUNNING, 1},

{"running->failing", v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, executors.NodePhasePending, func() (handler.Transition, error) {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure("code", "reason", nil)), nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (w *workflowNodeHandler) FinalizeRequired() bool {
return false
}

func (w *workflowNodeHandler) Setup(ctx context.Context, setupContext handler.SetupContext) error {
func (w *workflowNodeHandler) Setup(_ context.Context, _ handler.SetupContext) error {
return nil
}

Expand Down Expand Up @@ -94,17 +94,17 @@ func (w *workflowNodeHandler) Abort(ctx context.Context, nCtx handler.NodeExecut
wf := nCtx.Workflow()
wfNode := nCtx.Node().GetWorkflowNode()
if wfNode.GetSubWorkflowRef() != nil {
return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef())
return w.subWfHandler.HandleAbort(ctx, nCtx, wf, *wfNode.GetSubWorkflowRef(), reason)
}

if wfNode.GetLaunchPlanRefID() != nil {
return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node())
return w.lpHandler.HandleAbort(ctx, wf, nCtx.Node(), reason)
}
return nil
}

func (w *workflowNodeHandler) Finalize(ctx context.Context, executionContext handler.NodeExecutionContext) error {
logger.Debugf(ctx, "WorkflowNode::Finalizer: nothing to do")
func (w *workflowNodeHandler) Finalize(ctx context.Context, _ handler.NodeExecutionContext) error {
logger.Warnf(ctx, "Subworkflow finalize invoked. Nothing to be done")
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(nil)), nil
}

func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode) error {
func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, reason string) error {
nodeStatus := w.GetNodeExecutionStatus(ctx, node.GetID())
childID, err := GetChildWorkflowExecutionID(
w.GetExecutionID().WorkflowExecutionIdentifier,
Expand All @@ -160,5 +160,5 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, w v1alpha1.Executab
// THIS SHOULD NEVER HAPPEN
return err
}
return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted", w.GetName()))
return l.launchPlan.Kill(ctx, childID, fmt.Sprintf("parent execution id [%s] aborted, reason [%s]", w.GetName(), reason))
}
15 changes: 7 additions & 8 deletions pkg/controller/nodes/subworkflow/launchplan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
mocks2 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/lyft/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/mocks"
"github.com/lyft/flytepropeller/pkg/utils"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func createInmemoryStore(t testing.TB) *storage.DataStore {
Expand Down Expand Up @@ -614,7 +615,6 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {

t.Run("abort-success", func(t *testing.T) {
mockLPExec := &mocks.Executor{}
//mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope()))
mockLPExec.On("Kill",
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
Expand All @@ -626,14 +626,13 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {
h := launchPlanHandler{
launchPlan: mockLPExec,
}
err := h.HandleAbort(ctx, mockWf, mockNode)
err := h.HandleAbort(ctx, mockWf, mockNode, "some reason")
assert.NoError(t, err)
})

t.Run("abort-fail", func(t *testing.T) {
expectedErr := fmt.Errorf("fail")
mockLPExec := &mocks.Executor{}
// mockStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, storage.NewDefaultProtobufStore(utils.FailingRawStore{}, promutils.NewTestScope()))
mockLPExec.On("Kill",
ctx,
mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool {
Expand All @@ -645,7 +644,7 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) {
h := launchPlanHandler{
launchPlan: mockLPExec,
}
err := h.HandleAbort(ctx, mockWf, mockNode)
err := h.HandleAbort(ctx, mockWf, mockNode, "reason")
assert.Error(t, err)
assert.Equal(t, err, expectedErr)
})
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"fmt"

"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/executors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytestdlib/storage"
)

// TODO Add unit tests for subworkflow handler
Expand Down Expand Up @@ -174,7 +175,7 @@ func (s *subworkflowHandler) HandleSubWorkflowFailingNode(ctx context.Context, n
return s.DoInFailureHandling(ctx, nCtx, contextualSubWorkflow)
}

func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID) error {
func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, w v1alpha1.ExecutableWorkflow, workflowID v1alpha1.WorkflowID, reason string) error {
subWorkflow := w.FindSubWorkflow(workflowID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the w here is the highest level, parent workflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

contextual parent

if subWorkflow == nil {
return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID())
Expand All @@ -183,12 +184,12 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE
nodeStatus := w.GetNodeExecutionStatus(ctx, nCtx.NodeID())
contextualSubWorkflow := executors.NewSubContextualWorkflow(w, subWorkflow, nodeStatus)

startNode := w.StartNode()
startNode := contextualSubWorkflow.StartNode()
if startNode == nil {
return fmt.Errorf("no sub workflow [%s] found in node [%s]", workflowID, nCtx.NodeID())
}

return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, "")
return s.nodeExecutor.AbortHandler(ctx, contextualSubWorkflow, startNode, reason)
}

func newSubworkflowHandler(nodeExecutor executors.Node) subworkflowHandler {
Expand Down
78 changes: 78 additions & 0 deletions pkg/controller/nodes/subworkflow/subworkflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package subworkflow

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
coreMocks "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
execMocks "github.com/lyft/flytepropeller/pkg/controller/executors/mocks"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler/mocks"
)

func Test_subworkflowHandler_HandleAbort(t *testing.T) {
ctx := context.TODO()

t.Run("missing-subworkflow", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
wf.OnFindSubWorkflow("x").Return(nil)
nCtx.OnNodeID().Return("n1")
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("missing-startNode", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
swf.OnStartNode().Return(nil)
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("abort-error", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
n := &coreMocks.ExecutableNode{}
swf.OnStartNode().Return(n)
nodeExec.OnAbortHandler(ctx, wf, n, "reason").Return(fmt.Errorf("err"))
assert.Error(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})

t.Run("abort-success", func(t *testing.T) {
nCtx := &mocks.NodeExecutionContext{}
nodeExec := &execMocks.Node{}
s := newSubworkflowHandler(nodeExec)
wf := &coreMocks.ExecutableWorkflow{}
st := &coreMocks.ExecutableNodeStatus{}
swf := &coreMocks.ExecutableSubWorkflow{}
wf.OnFindSubWorkflow("x").Return(swf)
wf.OnGetNodeExecutionStatus(ctx, "n1").Return(st)
nCtx.OnNodeID().Return("n1")
n := &coreMocks.ExecutableNode{}
swf.OnStartNode().Return(n)
swf.OnGetID().Return("swf")
nodeExec.OnAbortHandlerMatch(mock.Anything, mock.MatchedBy(func(wf v1alpha1.ExecutableWorkflow) bool {
return wf.GetID() == swf.GetID()
}), n, mock.Anything).Return(nil)
assert.NoError(t, s.HandleAbort(ctx, nCtx, wf, "x", "reason"))
})
}