From b0851686683ac6903b97774233c62537158d5090 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 22 Apr 2024 21:19:25 -0500 Subject: [PATCH 1/6] added configuration for arraynode default parallelism behavior Signed-off-by: Daniel Rammer --- .../pkg/controller/config/config.go | 22 +++++++- .../pkg/controller/config/config_flags.go | 3 +- .../controller/config/config_flags_test.go | 30 ++++++++--- .../controller/nodes/array/event_recorder.go | 2 +- .../pkg/controller/nodes/array/handler.go | 50 ++++++++++++++++--- .../pkg/controller/nodes/array/utils.go | 22 ++++++++ .../pkg/controller/nodes/transformers.go | 2 +- 7 files changed, 112 insertions(+), 19 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index 89122da3ed..a00468d24e 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -115,8 +115,12 @@ var ( }, ClusterID: "propeller", CreateFlyteWorkflowCRD: false, - ArrayNodeEventVersion: 0, NodeExecutionWorkerCount: 8, + ArrayNode: ArrayNodeConfig{ + EventVersion: 0, + //DefaultParallelismBehavior: ParallelismBehaviorUnlimited, + DefaultParallelismBehavior: ParallelismBehaviorConfigured, + }, } ) @@ -156,8 +160,8 @@ type Config struct { ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"` ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"` CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"` - ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"` + ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"` } // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client. @@ -258,6 +262,20 @@ type EventConfig struct { FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` } +// TODO @hamersaw - docs +type ParallelismBehavior = string + +const ( + ParallelismBehaviorConfigured ParallelismBehavior = "configured" + ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited" + ParallelismBehaviorWorkflow ParallelismBehavior = "workflow" +) + +type ArrayNodeConfig struct { + EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"` + DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"` +} + // GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object. func GetConfig() *Config { return configSection.GetConfig().(*Config) diff --git a/flytepropeller/pkg/controller/config/config_flags.go b/flytepropeller/pkg/controller/config/config_flags.go index b055aad558..ea0b428c2f 100755 --- a/flytepropeller/pkg/controller/config/config_flags.go +++ b/flytepropeller/pkg/controller/config/config_flags.go @@ -108,7 +108,8 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.StringSlice(fmt.Sprintf("%v%v", prefix, "exclude-domain-label"), defaultConfig.ExcludeDomainLabel, "Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "cluster-id"), defaultConfig.ClusterID, "Unique cluster id running this flytepropeller instance with which to annotate execution events") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "create-flyteworkflow-crd"), defaultConfig.CreateFlyteWorkflowCRD, "Enable creation of the FlyteWorkflow CRD on startup") - cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "array-node-event-version"), defaultConfig.ArrayNodeEventVersion, "ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "node-execution-worker-count"), defaultConfig.NodeExecutionWorkerCount, "Number of workers to evaluate node executions, currently only used for array nodes") + cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "array-node-config.event-version"), defaultConfig.ArrayNode.EventVersion, "ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "array-node-config.default-parallelism-behavior"), defaultConfig.ArrayNode.DefaultParallelismBehavior, "Default parallelism behavior for array nodes") return cmdFlags } diff --git a/flytepropeller/pkg/controller/config/config_flags_test.go b/flytepropeller/pkg/controller/config/config_flags_test.go index 6f3c67b652..bce7238f60 100755 --- a/flytepropeller/pkg/controller/config/config_flags_test.go +++ b/flytepropeller/pkg/controller/config/config_flags_test.go @@ -911,28 +911,42 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_array-node-event-version", func(t *testing.T) { + t.Run("Test_node-execution-worker-count", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("array-node-event-version", testValue) - if vInt, err := cmdFlags.GetInt("array-node-event-version"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.ArrayNodeEventVersion) + cmdFlags.Set("node-execution-worker-count", testValue) + if vInt, err := cmdFlags.GetInt("node-execution-worker-count"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.NodeExecutionWorkerCount) } else { assert.FailNow(t, err.Error()) } }) }) - t.Run("Test_node-execution-worker-count", func(t *testing.T) { + t.Run("Test_array-node-config.event-version", func(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "1" - cmdFlags.Set("node-execution-worker-count", testValue) - if vInt, err := cmdFlags.GetInt("node-execution-worker-count"); err == nil { - testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.NodeExecutionWorkerCount) + cmdFlags.Set("array-node-config.event-version", testValue) + if vInt, err := cmdFlags.GetInt("array-node-config.event-version"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vInt), &actual.ArrayNode.EventVersion) + + } else { + assert.FailNow(t, err.Error()) + } + }) + }) + t.Run("Test_array-node-config.default-parallelism-behavior", func(t *testing.T) { + + t.Run("Override", func(t *testing.T) { + testValue := "1" + + cmdFlags.Set("array-node-config.default-parallelism-behavior", testValue) + if vString, err := cmdFlags.GetString("array-node-config.default-parallelism-behavior"); err == nil { + testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.ArrayNode.DefaultParallelismBehavior) } else { assert.FailNow(t, err.Error()) diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index c2e0c96ed8..35120c069e 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -202,7 +202,7 @@ func (*passThroughEventRecorder) finalizeRequired(ctx context.Context) bool { } func newArrayEventRecorder(eventRecorder interfaces.EventRecorder) arrayEventRecorder { - if config.GetConfig().ArrayNodeEventVersion == 0 { + if config.GetConfig().ArrayNode.EventVersion == 0 { return &externalResourcesEventRecorder{ EventRecorder: eventRecorder, } diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index cb5787053d..004f5a0ac4 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -253,7 +253,31 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu case v1alpha1.ArrayNodePhaseExecuting: // process array node subNodes - availableParallelism := 0 + /*incrementParallelism := false + parallelism := -1 + if arrayNode.GetParallelism() != nil && *arrayNode.GetParallelism() > 0 { + // if parallelism is not defaulted - use it + parallelism = int(*arrayNode.GetParallelism()) + } else { + // otherwise use either workflow or unlimited + parallelismBehavior := config.GetConfig().ArrayNode.DefaultParallelismBehavior + if parallelismBehavior == config.ParallelismBehaviorWorkflow || (arrayNode.GetParallelism() == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { + incrementParallelism = true + parallelism = int(nCtx.ExecutionContext().MaxParallelism() - nCtx.ExecutionContext().CurrentParallelism()) + + } else if parallelismBehavior == config.ParallelismBehaviorUnlimited + || (arrayNode.GetParallelism() != nil && arrayNode.GetParallelism() == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { + + parallelism = len(arrayNodeState.SubNodePhases.GetItems()) + } else { + // TODO - unsupported ArrayNode parallelism behavior? + } + }*/ + + incrementWorkflowParallelism, maxParallelism := computeParallelism(ctx, nCtx, &arrayNodeState, + arrayNode.GetParallelism(), config.GetConfig().ArrayNode.DefaultParallelismBehavior) + + /*availableParallelism := 0 // using the workflow's parallelism if the array node parallelism is not set useWorkflowParallelism := arrayNode.GetParallelism() == nil if useWorkflowParallelism { @@ -267,13 +291,18 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu if availableParallelism == 0 { availableParallelism = len(arrayNodeState.SubNodePhases.GetItems()) } - } + }*/ - nodeExecutionRequests := make([]*nodeExecutionRequest, 0, availableParallelism) + //nodeExecutionRequests := make([]*nodeExecutionRequest, 0, availableParallelism) + nodeExecutionRequests := make([]*nodeExecutionRequest, 0, maxParallelism) + currentParallelism := 0 for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { - if availableParallelism == 0 { + if currentParallelism >= maxParallelism { break } + /*if availableParallelism == 0 { + break + }*/ nodePhase := v1alpha1.NodePhase(nodePhaseUint64) taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) @@ -315,10 +344,19 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // TODO - this is a naive implementation of parallelism, if we want to support more // complex subNodes (ie. dynamics / subworkflows) we need to revisit this so that // parallelism is handled during subNode evaluations + avoid deadlocks - if useWorkflowParallelism { + /*if useWorkflowParallelism { + nCtx.ExecutionContext().IncrementParallelism() + }*/ + //availableParallelism-- + if incrementWorkflowParallelism { nCtx.ExecutionContext().IncrementParallelism() } - availableParallelism-- + currentParallelism++ + } + + // TODO @hamersaw - docs + if !incrementWorkflowParallelism { + nCtx.ExecutionContext().IncrementParallelism() } workerErrorCollector := errorcollector.NewErrorMessageCollector() diff --git a/flytepropeller/pkg/controller/nodes/array/utils.go b/flytepropeller/pkg/controller/nodes/array/utils.go index 342ccca3f6..a083dba0ec 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils.go +++ b/flytepropeller/pkg/controller/nodes/array/utils.go @@ -7,6 +7,8 @@ import ( idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex" @@ -66,3 +68,23 @@ func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool { return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut || nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered } + +func computeParallelism(_ context.Context, nCtx interfaces.NodeExecutionContext, + arrayNodeState *handler.ArrayNodeState, parallelism *uint32, parallelismBehavior string) (bool, int) { + + if parallelism != nil && *parallelism > 0 { + // if parallelism is not defaulted - use it + return false, int(*parallelism) + } else { + // otherwise use either workflow or unlimited + if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { + return true, int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism()) + } else if parallelismBehavior == config.ParallelismBehaviorUnlimited || + (parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { + return false, len(arrayNodeState.SubNodePhases.GetItems()) + } + } + + // TODO @hamersaw - log unreachable? + return false, len(arrayNodeState.SubNodePhases.GetItems()) +} diff --git a/flytepropeller/pkg/controller/nodes/transformers.go b/flytepropeller/pkg/controller/nodes/transformers.go index b034c5b90f..1c911b44f3 100644 --- a/flytepropeller/pkg/controller/nodes/transformers.go +++ b/flytepropeller/pkg/controller/nodes/transformers.go @@ -179,7 +179,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, nev.IsParent = true } else if node.GetKind() == v1alpha1.NodeKindArray { nev.IsArray = true - if config.GetConfig().ArrayNodeEventVersion == 1 { + if config.GetConfig().ArrayNode.EventVersion == 1 { nev.IsParent = true } } else if dynamicNodePhase != v1alpha1.DynamicNodePhaseNone { From e57eeb46f24f08be626b634638a5bea8a4d1d036 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 22 Apr 2024 23:38:30 -0500 Subject: [PATCH 2/6] added unit tests and fixed linter Signed-off-by: Daniel Rammer --- .../pkg/controller/config/config.go | 3 +- .../pkg/controller/nodes/array/handler.go | 15 +-- .../controller/nodes/array/handler_test.go | 32 ++++-- .../pkg/controller/nodes/array/utils.go | 34 +++--- .../pkg/controller/nodes/array/utils_test.go | 107 ++++++++++++++++++ 5 files changed, 151 insertions(+), 40 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index a00468d24e..a27bedc557 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -118,8 +118,7 @@ var ( NodeExecutionWorkerCount: 8, ArrayNode: ArrayNodeConfig{ EventVersion: 0, - //DefaultParallelismBehavior: ParallelismBehaviorUnlimited, - DefaultParallelismBehavior: ParallelismBehaviorConfigured, + DefaultParallelismBehavior: ParallelismBehaviorUnlimited, }, } ) diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 004f5a0ac4..7cac5204d2 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -274,8 +274,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu } }*/ - incrementWorkflowParallelism, maxParallelism := computeParallelism(ctx, nCtx, &arrayNodeState, - arrayNode.GetParallelism(), config.GetConfig().ArrayNode.DefaultParallelismBehavior) + remainingWorkflowParallelism := int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism()) + incrementWorkflowParallelism, maxParallelism := identifyParallelism(arrayNode.GetParallelism(), + config.GetConfig().ArrayNode.DefaultParallelismBehavior, remainingWorkflowParallelism, len(arrayNodeState.SubNodePhases.GetItems())) /*availableParallelism := 0 // using the workflow's parallelism if the array node parallelism is not set @@ -354,11 +355,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu currentParallelism++ } - // TODO @hamersaw - docs - if !incrementWorkflowParallelism { - nCtx.ExecutionContext().IncrementParallelism() - } - workerErrorCollector := errorcollector.NewErrorMessageCollector() subNodeFailureCollector := errorcollector.NewErrorMessageCollector() for i, nodeExecutionRequest := range nodeExecutionRequests { @@ -456,6 +452,11 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // wait until all tasks have completed before declaring success arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding } + + // TODO @hamersaw - docs + if !incrementWorkflowParallelism && arrayNodeState.Phase == v1alpha1.ArrayNodePhaseExecuting { + nCtx.ExecutionContext().IncrementParallelism() + } case v1alpha1.ArrayNodePhaseFailing: if err := a.Abort(ctx, nCtx, "ArrayNodeFailing"); err != nil { return handler.UnknownTransition, err diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index f7863731b4..1175097c01 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -461,6 +461,11 @@ func uint32Ptr(v uint32) *uint32 { func TestHandleArrayNodePhaseExecuting(t *testing.T) { ctx := context.Background() + + // setting default parallelism behavior on ArrayNode to "configured" to test the largest scope of functionality + flyteConfig := config.GetConfig() + flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorConfigured + minSuccessRatio := float32(0.5) // initialize universal variables @@ -511,6 +516,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, + incrementParallelismCount: 1, }, { name: "StartOneSubNodeParallelism", @@ -529,12 +535,11 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, + incrementParallelismCount: 1, }, { - name: "UtilizeWfParallelismAllSubNodes", - parallelism: nil, - currentWfParallelism: 0, - incrementParallelismCount: 2, + name: "UtilizeWfParallelismAllSubNodes", + parallelism: nil, subNodePhases: []v1alpha1.NodePhase{ v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseQueued, @@ -550,12 +555,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, + currentWfParallelism: 0, + incrementParallelismCount: 2, }, { - name: "UtilizeWfParallelismSomeSubNodes", - parallelism: nil, - currentWfParallelism: workflowMaxParallelism - 1, - incrementParallelismCount: 1, + name: "UtilizeWfParallelismSomeSubNodes", + parallelism: nil, subNodePhases: []v1alpha1.NodePhase{ v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseQueued, @@ -570,12 +575,12 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING}, + currentWfParallelism: workflowMaxParallelism - 1, + incrementParallelismCount: 1, }, { - name: "UtilizeWfParallelismNoSubNodes", - parallelism: nil, - currentWfParallelism: workflowMaxParallelism, - incrementParallelismCount: 0, + name: "UtilizeWfParallelismNoSubNodes", + parallelism: nil, subNodePhases: []v1alpha1.NodePhase{ v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseQueued, @@ -588,6 +593,8 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{}, + currentWfParallelism: workflowMaxParallelism, + incrementParallelismCount: 0, }, { name: "StartSubNodesNewAttempts", @@ -607,6 +614,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { expectedArrayNodePhase: v1alpha1.ArrayNodePhaseExecuting, expectedTransitionPhase: handler.EPhaseRunning, expectedExternalResourcePhases: []idlcore.TaskExecution_Phase{idlcore.TaskExecution_RUNNING, idlcore.TaskExecution_RUNNING}, + incrementParallelismCount: 1, }, { name: "AllSubNodesSuccedeed", diff --git a/flytepropeller/pkg/controller/nodes/array/utils.go b/flytepropeller/pkg/controller/nodes/array/utils.go index a083dba0ec..6757d4f24e 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils.go +++ b/flytepropeller/pkg/controller/nodes/array/utils.go @@ -8,7 +8,6 @@ import ( idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex" @@ -64,27 +63,24 @@ func constructOutputReferences(ctx context.Context, nCtx interfaces.NodeExecutio return subDataDir, subOutputDir, nil } -func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool { - return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut || - nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered -} - -func computeParallelism(_ context.Context, nCtx interfaces.NodeExecutionContext, - arrayNodeState *handler.ArrayNodeState, parallelism *uint32, parallelismBehavior string) (bool, int) { - +func identifyParallelism(parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) { if parallelism != nil && *parallelism > 0 { // if parallelism is not defaulted - use it return false, int(*parallelism) - } else { - // otherwise use either workflow or unlimited - if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { - return true, int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism()) - } else if parallelismBehavior == config.ParallelismBehaviorUnlimited || - (parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { - return false, len(arrayNodeState.SubNodePhases.GetItems()) - } + } else if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { + // if workflow level parallelism + return true, remainingWorkflowParallelism + } else if parallelismBehavior == config.ParallelismBehaviorUnlimited || + (parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { + // if unlimited parallelism + return false, arrayNodeSize } - // TODO @hamersaw - log unreachable? - return false, len(arrayNodeState.SubNodePhases.GetItems()) + // TODO @hamersaw - log unreachable? defaulting to unlimited parallelism + return false, arrayNodeSize +} + +func isTerminalNodePhase(nodePhase v1alpha1.NodePhase) bool { + return nodePhase == v1alpha1.NodePhaseSucceeded || nodePhase == v1alpha1.NodePhaseFailed || nodePhase == v1alpha1.NodePhaseTimedOut || + nodePhase == v1alpha1.NodePhaseSkipped || nodePhase == v1alpha1.NodePhaseRecovered } diff --git a/flytepropeller/pkg/controller/nodes/array/utils_test.go b/flytepropeller/pkg/controller/nodes/array/utils_test.go index fde3d0fa80..38f092990b 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/array/utils_test.go @@ -34,3 +34,110 @@ func TestAppendLiteral(t *testing.T) { assert.Equal(t, 2, len(collection.Collection.Literals)) } } + +func TestIdentifyParallelism(t *testing.T) { + zero := uint32(0) + one := uint32(1) + + tests := []struct { + name string + parallelism *uint32 + parallelismBehavior string + remainingParallelism int + arrayNodeSize int + expectedIncrement bool + expectedMaxParallelism int + }{ + { + name: "NilParallelismWorkflowBehavior", + parallelism: nil, + parallelismBehavior: "workflow", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: true, + expectedMaxParallelism: 2, + }, + { + name: "NilParallelismConfiguredBehavior", + parallelism: nil, + parallelismBehavior: "configured", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: true, + expectedMaxParallelism: 2, + }, + { + name: "NilParallelismUnlimitedBehavior", + parallelism: nil, + parallelismBehavior: "unlimited", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 3, + }, + { + name: "ZeroParallelismWorkflowBehavior", + parallelism: &zero, + parallelismBehavior: "workflow", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: true, + expectedMaxParallelism: 2, + }, + { + name: "ZeroParallelismConfiguredBehavior", + parallelism: &zero, + parallelismBehavior: "configured", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 3, + }, + { + name: "ZeroParallelismUnlimitedBehavior", + parallelism: &zero, + parallelismBehavior: "unlimited", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 3, + }, + { + name: "OneParallelismWorkflowBehavior", + parallelism: &one, + parallelismBehavior: "workflow", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 1, + }, + { + name: "OneParallelismConfiguredBehavior", + parallelism: &one, + parallelismBehavior: "configured", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 1, + }, + { + name: "OneParallelismUnlimitedBehavior", + parallelism: &one, + parallelismBehavior: "unlimited", + remainingParallelism: 2, + arrayNodeSize: 3, + expectedIncrement: false, + expectedMaxParallelism: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + increment, maxParallelism := identifyParallelism(tt.parallelism, tt.parallelismBehavior, tt.remainingParallelism, tt.arrayNodeSize) + assert.Equal(t, tt.expectedIncrement, increment) + assert.Equal(t, tt.expectedMaxParallelism, maxParallelism) + }) + } + + //func identifyParallelism(parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) { +} From 4dc98c94792baaf29f12cff49043458b2c38a3c2 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 23 Apr 2024 09:22:52 -0500 Subject: [PATCH 3/6] cleanup / docs Signed-off-by: Daniel Rammer --- .../pkg/controller/config/config.go | 17 +++++-- .../pkg/controller/nodes/array/handler.go | 51 ++----------------- .../controller/nodes/array/handler_test.go | 2 +- .../pkg/controller/nodes/array/utils.go | 10 ++-- .../pkg/controller/nodes/array/utils_test.go | 8 +-- 5 files changed, 27 insertions(+), 61 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index a27bedc557..b84355fc0d 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -261,13 +261,22 @@ type EventConfig struct { FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."` } -// TODO @hamersaw - docs +// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default type ParallelismBehavior = string const ( - ParallelismBehaviorConfigured ParallelismBehavior = "configured" - ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited" - ParallelismBehaviorWorkflow ParallelismBehavior = "workflow" + // ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the + // ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have + // unlimited parallelism. + ParallelismBehaviorHybrid ParallelismBehavior = "hybrid" + + // ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited + // parallelism for both nil and 0. + ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited" + + // ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the + // configured workflow parallelism for both nil and 0. + ParallelismBehaviorWorkflow ParallelismBehavior = "workflow" ) type ArrayNodeConfig struct { diff --git a/flytepropeller/pkg/controller/nodes/array/handler.go b/flytepropeller/pkg/controller/nodes/array/handler.go index 7cac5204d2..0f9e95f19b 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler.go +++ b/flytepropeller/pkg/controller/nodes/array/handler.go @@ -252,58 +252,16 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.Phase = v1alpha1.ArrayNodePhaseExecuting case v1alpha1.ArrayNodePhaseExecuting: // process array node subNodes - - /*incrementParallelism := false - parallelism := -1 - if arrayNode.GetParallelism() != nil && *arrayNode.GetParallelism() > 0 { - // if parallelism is not defaulted - use it - parallelism = int(*arrayNode.GetParallelism()) - } else { - // otherwise use either workflow or unlimited - parallelismBehavior := config.GetConfig().ArrayNode.DefaultParallelismBehavior - if parallelismBehavior == config.ParallelismBehaviorWorkflow || (arrayNode.GetParallelism() == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { - incrementParallelism = true - parallelism = int(nCtx.ExecutionContext().MaxParallelism() - nCtx.ExecutionContext().CurrentParallelism()) - - } else if parallelismBehavior == config.ParallelismBehaviorUnlimited - || (arrayNode.GetParallelism() != nil && arrayNode.GetParallelism() == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { - - parallelism = len(arrayNodeState.SubNodePhases.GetItems()) - } else { - // TODO - unsupported ArrayNode parallelism behavior? - } - }*/ - remainingWorkflowParallelism := int(nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - nCtx.ExecutionContext().CurrentParallelism()) - incrementWorkflowParallelism, maxParallelism := identifyParallelism(arrayNode.GetParallelism(), + incrementWorkflowParallelism, maxParallelism := inferParallelism(ctx, arrayNode.GetParallelism(), config.GetConfig().ArrayNode.DefaultParallelismBehavior, remainingWorkflowParallelism, len(arrayNodeState.SubNodePhases.GetItems())) - /*availableParallelism := 0 - // using the workflow's parallelism if the array node parallelism is not set - useWorkflowParallelism := arrayNode.GetParallelism() == nil - if useWorkflowParallelism { - // greedily take all available slots - // TODO: This will need to be re-evaluated if we want to support dynamics & sub_workflows - currentParallelism := nCtx.ExecutionContext().CurrentParallelism() - maxParallelism := nCtx.ExecutionContext().GetExecutionConfig().MaxParallelism - availableParallelism = int(maxParallelism - currentParallelism) - } else { - availableParallelism = int(*arrayNode.GetParallelism()) - if availableParallelism == 0 { - availableParallelism = len(arrayNodeState.SubNodePhases.GetItems()) - } - }*/ - - //nodeExecutionRequests := make([]*nodeExecutionRequest, 0, availableParallelism) nodeExecutionRequests := make([]*nodeExecutionRequest, 0, maxParallelism) currentParallelism := 0 for i, nodePhaseUint64 := range arrayNodeState.SubNodePhases.GetItems() { if currentParallelism >= maxParallelism { break } - /*if availableParallelism == 0 { - break - }*/ nodePhase := v1alpha1.NodePhase(nodePhaseUint64) taskPhase := int(arrayNodeState.SubNodeTaskPhases.GetItem(i)) @@ -345,10 +303,6 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu // TODO - this is a naive implementation of parallelism, if we want to support more // complex subNodes (ie. dynamics / subworkflows) we need to revisit this so that // parallelism is handled during subNode evaluations + avoid deadlocks - /*if useWorkflowParallelism { - nCtx.ExecutionContext().IncrementParallelism() - }*/ - //availableParallelism-- if incrementWorkflowParallelism { nCtx.ExecutionContext().IncrementParallelism() } @@ -453,7 +407,8 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu arrayNodeState.Phase = v1alpha1.ArrayNodePhaseSucceeding } - // TODO @hamersaw - docs + // if incrementWorkflowParallelism is not set then we need to increment the parallelism by one + // to indicate that the overall ArrayNode is still running if !incrementWorkflowParallelism && arrayNodeState.Phase == v1alpha1.ArrayNodePhaseExecuting { nCtx.ExecutionContext().IncrementParallelism() } diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 1175097c01..23d5bfd2d2 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -464,7 +464,7 @@ func TestHandleArrayNodePhaseExecuting(t *testing.T) { // setting default parallelism behavior on ArrayNode to "configured" to test the largest scope of functionality flyteConfig := config.GetConfig() - flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorConfigured + flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorHybrid minSuccessRatio := float32(0.5) diff --git a/flytepropeller/pkg/controller/nodes/array/utils.go b/flytepropeller/pkg/controller/nodes/array/utils.go index 6757d4f24e..34e304b662 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils.go +++ b/flytepropeller/pkg/controller/nodes/array/utils.go @@ -12,6 +12,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/codex" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task/k8s" + "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -63,20 +64,21 @@ func constructOutputReferences(ctx context.Context, nCtx interfaces.NodeExecutio return subDataDir, subOutputDir, nil } -func identifyParallelism(parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) { +func inferParallelism(ctx context.Context, parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) { if parallelism != nil && *parallelism > 0 { // if parallelism is not defaulted - use it return false, int(*parallelism) - } else if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorConfigured) { + } else if parallelismBehavior == config.ParallelismBehaviorWorkflow || (parallelism == nil && parallelismBehavior == config.ParallelismBehaviorHybrid) { // if workflow level parallelism return true, remainingWorkflowParallelism } else if parallelismBehavior == config.ParallelismBehaviorUnlimited || - (parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorConfigured) { + (parallelism != nil && *parallelism == 0 && parallelismBehavior == config.ParallelismBehaviorHybrid) { // if unlimited parallelism return false, arrayNodeSize } - // TODO @hamersaw - log unreachable? defaulting to unlimited parallelism + logger.Warnf(ctx, "unable to infer ArrayNode parallelism configuration for parallelism:%v behavior:%v, defaulting to unlimited parallelism", + parallelism, parallelismBehavior) return false, arrayNodeSize } diff --git a/flytepropeller/pkg/controller/nodes/array/utils_test.go b/flytepropeller/pkg/controller/nodes/array/utils_test.go index 38f092990b..388783b0dc 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/array/utils_test.go @@ -1,6 +1,7 @@ package array import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -35,7 +36,8 @@ func TestAppendLiteral(t *testing.T) { } } -func TestIdentifyParallelism(t *testing.T) { +func TestInferParallelism(t *testing.T) { + ctx := context.TODO() zero := uint32(0) one := uint32(1) @@ -133,11 +135,9 @@ func TestIdentifyParallelism(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - increment, maxParallelism := identifyParallelism(tt.parallelism, tt.parallelismBehavior, tt.remainingParallelism, tt.arrayNodeSize) + increment, maxParallelism := inferParallelism(ctx, tt.parallelism, tt.parallelismBehavior, tt.remainingParallelism, tt.arrayNodeSize) assert.Equal(t, tt.expectedIncrement, increment) assert.Equal(t, tt.expectedMaxParallelism, maxParallelism) }) } - - //func identifyParallelism(parallelism *uint32, parallelismBehavior string, remainingWorkflowParallelism, arrayNodeSize int) (bool, int) { } From 0b03de7db518f8a3697c7232f00d7d13b448f3d8 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 23 Apr 2024 09:34:48 -0500 Subject: [PATCH 4/6] fixed ytpo Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytepropeller/pkg/controller/nodes/array/handler_test.go b/flytepropeller/pkg/controller/nodes/array/handler_test.go index 23d5bfd2d2..ba0815fee6 100644 --- a/flytepropeller/pkg/controller/nodes/array/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/array/handler_test.go @@ -462,7 +462,7 @@ func uint32Ptr(v uint32) *uint32 { func TestHandleArrayNodePhaseExecuting(t *testing.T) { ctx := context.Background() - // setting default parallelism behavior on ArrayNode to "configured" to test the largest scope of functionality + // setting default parallelism behavior on ArrayNode to "hybrid" to test the largest scope of functionality flyteConfig := config.GetConfig() flyteConfig.ArrayNode.DefaultParallelismBehavior = config.ParallelismBehaviorHybrid From f6021e0542c6277dee66bfcbf4d44d9c429351a2 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 23 Apr 2024 12:19:49 -0500 Subject: [PATCH 5/6] docs update Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/config/config.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go index b84355fc0d..419386eddd 100644 --- a/flytepropeller/pkg/controller/config/config.go +++ b/flytepropeller/pkg/controller/config/config.go @@ -271,11 +271,13 @@ const ( ParallelismBehaviorHybrid ParallelismBehavior = "hybrid" // ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited - // parallelism for both nil and 0. + // parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then + // ArrayNode will adhere to that value. ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited" // ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the - // configured workflow parallelism for both nil and 0. + // configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero) + // parallelism is set, then ArrayNode will adhere to that value. ParallelismBehaviorWorkflow ParallelismBehavior = "workflow" ) From ceae1422eae168b017e821815093e3d930869c67 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Tue, 23 Apr 2024 15:46:21 -0500 Subject: [PATCH 6/6] fixed unit tests Signed-off-by: Daniel Rammer --- .../pkg/controller/nodes/array/utils_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/utils_test.go b/flytepropeller/pkg/controller/nodes/array/utils_test.go index 388783b0dc..2b2c030cd6 100644 --- a/flytepropeller/pkg/controller/nodes/array/utils_test.go +++ b/flytepropeller/pkg/controller/nodes/array/utils_test.go @@ -60,9 +60,9 @@ func TestInferParallelism(t *testing.T) { expectedMaxParallelism: 2, }, { - name: "NilParallelismConfiguredBehavior", + name: "NilParallelismHybridBehavior", parallelism: nil, - parallelismBehavior: "configured", + parallelismBehavior: "hybrid", remainingParallelism: 2, arrayNodeSize: 3, expectedIncrement: true, @@ -87,9 +87,9 @@ func TestInferParallelism(t *testing.T) { expectedMaxParallelism: 2, }, { - name: "ZeroParallelismConfiguredBehavior", + name: "ZeroParallelismHybridBehavior", parallelism: &zero, - parallelismBehavior: "configured", + parallelismBehavior: "hybrid", remainingParallelism: 2, arrayNodeSize: 3, expectedIncrement: false, @@ -114,9 +114,9 @@ func TestInferParallelism(t *testing.T) { expectedMaxParallelism: 1, }, { - name: "OneParallelismConfiguredBehavior", + name: "OneParallelismHybridBehavior", parallelism: &one, - parallelismBehavior: "configured", + parallelismBehavior: "hybrid", remainingParallelism: 2, arrayNodeSize: 3, expectedIncrement: false,