Skip to content

Commit

Permalink
added configuration for arraynode default parallelism behavior (flyte…
Browse files Browse the repository at this point in the history
…org#5268)

* added configuration for arraynode default parallelism behavior

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests and fixed linter

Signed-off-by: Daniel Rammer <[email protected]>

* cleanup / docs

Signed-off-by: Daniel Rammer <[email protected]>

* fixed ytpo

Signed-off-by: Daniel Rammer <[email protected]>

* docs update

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored and austin362667 committed May 7, 2024
1 parent 916a36b commit fa4ad71
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 45 deletions.
32 changes: 30 additions & 2 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,11 @@ var (
},
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
NodeExecutionWorkerCount: 8,
ArrayNode: ArrayNodeConfig{
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
}
)

Expand Down Expand Up @@ -156,8 +159,8 @@ type Config struct {
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down Expand Up @@ -258,6 +261,31 @@ type EventConfig struct {
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
type ParallelismBehavior = string

const (
// ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the
// ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have
// unlimited parallelism.
ParallelismBehaviorHybrid ParallelismBehavior = "hybrid"

// ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited
// parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then
// ArrayNode will adhere to that value.
ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited"

// ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the
// configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero)
// parallelism is set, then ArrayNode will adhere to that value.
ParallelismBehaviorWorkflow ParallelismBehavior = "workflow"
)

type ArrayNodeConfig struct {
EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions flytepropeller/pkg/controller/config/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (*passThroughEventRecorder) finalizeRequired(ctx context.Context) bool {
}

func newArrayEventRecorder(eventRecorder interfaces.EventRecorder) arrayEventRecorder {
if config.GetConfig().ArrayNodeEventVersion == 0 {
if config.GetConfig().ArrayNode.EventVersion == 0 {
return &externalResourcesEventRecorder{
EventRecorder: eventRecorder,
}
Expand Down
34 changes: 14 additions & 20 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,14 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting
case v1alpha1.ArrayNodePhaseExecuting:
// process array node subNodes
remainingWorkflowParallelism := int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism())
incrementWorkflowParallelism, maxParallelism := inferParallelism(ctx, arrayNode.GetParallelism(),
config.GetConfig().ArrayNode.DefaultParallelismBehavior, remainingWorkflowParallelism, len(arrayNodeState.SubNodePhases.GetItems()))

availableParallelism := 0
// using the workflow's parallelism if the array node parallelism is not set
useWorkflowParallelism := arrayNode.GetParallelism() == nil
if useWorkflowParallelism {
// greedily take all available slots
// TODO: This will need to be re-evaluated if we want to support dynamics & sub_workflows
currentParallelism := nCtx.ExecutionContext().CurrentParallelism()
maxParallelism := nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism
availableParallelism = int(maxParallelism - currentParallelism)
} else {
availableParallelism = int(*arrayNode.GetParallelism())
if availableParallelism == 0 {
availableParallelism = len(arrayNodeState.SubNodePhases.GetItems())
}
}

nodeExecutionRequests := make([]*nodeExecutionRequest, 0, availableParallelism)
nodeExecutionRequests := make([]*nodeExecutionRequest, 0, maxParallelism)
currentParallelism := 0
for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() {
if availableParallelism == 0 {
if currentParallelism >= maxParallelism {
break
}

Expand Down Expand Up @@ -315,10 +303,10 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// TODO - this is a naive implementation of parallelism, if we want to support more
// complex subNodes (ie. dynamics / subworkflows) we need to revisit this so that
// parallelism is handled during subNode evaluations + avoid deadlocks
if useWorkflowParallelism {
if incrementWorkflowParallelism {
nCtx.ExecutionContext().IncrementParallelism()
}
availableParallelism--
currentParallelism++
}

workerErrorCollector := errorcollector.NewErrorMessageCollector()
Expand Down Expand Up @@ -418,6 +406,12 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
// wait until all tasks have completed before declaring success
arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding
}

// if incrementWorkflowParallelism is not set then we need to increment the parallelism by one
// to indicate that the overall ArrayNode is still running
if !incrementWorkflowParallelism && arrayNodeState.Phase == v1alpha1.ArrayNodePhaseExecuting {
nCtx.ExecutionContext().IncrementParallelism()
}
case v1alpha1.ArrayNodePhaseFailing:
if err := a.Abort(ctx, nCtx, "ArrayNodeFailing"); err != nil {
return handler.UnknownTransition, err
Expand Down
32 changes: 20 additions & 12 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ func uint32Ptr(v uint32) *uint32 {

func TestHandleArrayNodePhaseExecuting(t *testing.T) {
ctx := context.Background()

// setting default parallelism behavior on ArrayNode to "hybrid" to test the largest scope of functionality
flyteConfig := config.GetConfig()
flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorHybrid

minSuccessRatio := float32(0.5)

// initialize universal variables
Expand Down Expand Up @@ -511,6 +516,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "StartOneSubNodeParallelism",
Expand All @@ -529,12 +535,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "UtilizeWfParallelismAllSubNodes",
parallelism: nil,
currentWfParallelism: 0,
incrementParallelismCount: 2,
name: "UtilizeWfParallelismAllSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -550,12 +555,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
currentWfParallelism: 0,
incrementParallelismCount: 2,
},
{
name: "UtilizeWfParallelismSomeSubNodes",
parallelism: nil,
currentWfParallelism: workflowMaxParallelism - 1,
incrementParallelismCount: 1,
name: "UtilizeWfParallelismSomeSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -570,12 +575,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING},
currentWfParallelism: workflowMaxParallelism - 1,
incrementParallelismCount: 1,
},
{
name: "UtilizeWfParallelismNoSubNodes",
parallelism: nil,
currentWfParallelism: workflowMaxParallelism,
incrementParallelismCount: 0,
name: "UtilizeWfParallelismNoSubNodes",
parallelism: nil,
subNodePhases: []v1alpha1.NodePhase{
v1alpha1.NodePhaseQueued,
v1alpha1.NodePhaseQueued,
Expand All @@ -588,6 +593,8 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{},
currentWfParallelism: workflowMaxParallelism,
incrementParallelismCount: 0,
},
{
name: "StartSubNodesNewAttempts",
Expand All @@ -607,6 +614,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) {
expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting,
expectedTransitionPhase: handler.EPhaseRunning,
expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING},
incrementParallelismCount: 1,
},
{
name: "AllSubNodesSuccedeed",
Expand Down
20 changes: 20 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

Expand Down Expand Up @@ -62,6 +64,24 @@ func constructOutputReferences(ctx context.Context, nCtx interfaces.NodeExecutio
return subDataDir, subOutputDir, nil
}

func inferParallelism(ctx context.Context, parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) {
if parallelism != nil && *parallelism > 0 {
// if parallelism is not defaulted - use it
return false, int(*parallelism)
} else if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorHybrid) {
// if workflow level parallelism
return true, remainingWorkflowParallelism
} else if parallelismBehavior == config.ParallelismBehaviorUnlimited ||
(parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorHybrid) {
// if unlimited parallelism
return false, arrayNodeSize
}

logger.Warnf(ctx, "unable to infer ArrayNode parallelism configuration for parallelism:%v behavior:%v, defaulting to unlimited parallelism",
parallelism, parallelismBehavior)
return false, arrayNodeSize
}

func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool {
return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut ||
nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered
Expand Down
Loading

0 comments on commit fa4ad71

Please sign in to comment.