Skip to content

Commit

Permalink
cherry pick arranode parallelism config (#229)
Browse files Browse the repository at this point in the history
* Feature/array node workflow parallelism (#5062)

* update arraynode proto parallelism field to varint compatible int64

Signed-off-by: Paul Dittamo <[email protected]>

* have array nodes utilize workflow parallelism

Signed-off-by: Paul Dittamo <[email protected]>

* return if available parallelism is 0

Signed-off-by: Paul Dittamo <[email protected]>

* unit test

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* enable parallelism to be set to nil for array node (#5214)

* enable parallelism to be set to nil for array node

Signed-off-by: Paul Dittamo <[email protected]>

* unit test

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* added configuration for arraynode default parallelism behavior (#5268)

* added configuration for arraynode default parallelism behavior

Signed-off-by: Daniel Rammer <[email protected]>

* added unit tests and fixed linter

Signed-off-by: Daniel Rammer <[email protected]>

* cleanup / docs

Signed-off-by: Daniel Rammer <[email protected]>

* fixed ytpo

Signed-off-by: Daniel Rammer <[email protected]>

* docs update

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Paul Dittamo <[email protected]>
  • Loading branch information
hamersaw and pvditt authored Apr 24, 2024
1 parent 4ca6237 commit 91dc63f
Show file tree
Hide file tree
Showing 23 changed files with 650 additions and 282 deletions.
22 changes: 14 additions & 8 deletions flyteidl/gen/pb-es/flyteidl/core/workflow_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

356 changes: 191 additions & 165 deletions flyteidl/gen/pb-go/flyteidl/core/workflow.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions flyteidl/gen/pb-js/flyteidl.d.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 14 additions & 1 deletion flyteidl/gen/pb-js/flyteidl.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 42 additions & 41 deletions flyteidl/gen/pb_python/flyteidl/core/workflow_pb2.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions flyteidl/gen/pb_python/flyteidl/core/workflow_pb2.pyi

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions flyteidl/gen/pb_rust/flyteidl.core.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 8 additions & 5 deletions flyteidl/protos/flyteidl/core/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "flyteidl/core/tasks.proto";
import "flyteidl/core/types.proto";
import "flyteidl/core/security.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/wrappers.proto";

// Defines a condition and the execution unit that should be executed if the condition is satisfied.
message IfBlock {
Expand Down Expand Up @@ -114,11 +115,13 @@ message ArrayNode {
// node is the sub-node that will be executed for each element in the array.
Node node = 1;

// parallelism defines the minimum number of instances to bring up concurrently at any given
// point. Note that this is an optimistic restriction and that, due to network partitioning or
// other failures, the actual number of currently running instances might be more. This has to
// be a positive number if assigned. Default value is size.
uint32 parallelism = 2;
oneof parallelism_option {
// parallelism defines the minimum number of instances to bring up concurrently at any given
// point. Note that this is an optimistic restriction and that, due to network partitioning or
// other failures, the actual number of currently running instances might be more. This has to
// be a positive number if assigned. Default value is size.
uint32 parallelism = 2;
}

oneof success_criteria {
// min_successes is an absolute number of the minimum number of successful completions of
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package v1alpha1

type ArrayNodeSpec struct {
SubNodeSpec *NodeSpec
Parallelism uint32
Parallelism *uint32
MinSuccesses *uint32
MinSuccessRatio *float32
}
Expand All @@ -11,7 +11,7 @@ func (a *ArrayNodeSpec) GetSubNodeSpec() *NodeSpec {
return a.SubNodeSpec
}

func (a *ArrayNodeSpec) GetParallelism() uint32 {
func (a *ArrayNodeSpec) GetParallelism() *uint32 {
return a.Parallelism
}

Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ func TestArrayNodeSpec_GetSubNodeSpec(t *testing.T) {
func TestArrayNodeSpec_GetParallelism(t *testing.T) {
parallelism := uint32(5)
arrayNodeSpec := ArrayNodeSpec{
Parallelism: parallelism,
Parallelism: &parallelism,
}

if arrayNodeSpec.GetParallelism() != parallelism {
if arrayNodeSpec.GetParallelism() != &parallelism {
t.Errorf("Expected %d, but got %d", parallelism, arrayNodeSpec.GetParallelism())
}
}
Expand Down
2 changes: 1 addition & 1 deletion flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ type ExecutableGateNode interface {

type ExecutableArrayNode interface {
GetSubNodeSpec() *NodeSpec
GetParallelism() uint32
GetParallelism() *uint32
GetMinSuccesses() *uint32
GetMinSuccessRatio() *float32
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,17 @@ func buildNodeSpec(n *core.Node, tasks []*core.CompiledTask, errs errors.Compile
return nil, ok
}

var parallelism *uint32
switch x := arrayNode.GetParallelismOption().(type) {
case *core.ArrayNode_Parallelism:
parallelism = &x.Parallelism
}

// build ArrayNode
nodeSpec.Kind = v1alpha1.NodeKindArray
nodeSpec.ArrayNode = &v1alpha1.ArrayNodeSpec{
SubNodeSpec: subNodeSpecs[0],
Parallelism: arrayNode.Parallelism,
Parallelism: parallelism,
}

switch successCriteria := arrayNode.SuccessCriteria.(type) {
Expand Down
33 changes: 32 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,45 @@ func TestBuildNodeSpec(t *testing.T) {
},
},
},
Parallelism: 10,
ParallelismOption: &core.ArrayNode_Parallelism{
Parallelism: 10,
},
SuccessCriteria: &core.ArrayNode_MinSuccessRatio{
MinSuccessRatio: 0.5,
},
},
}

mustBuild(t, n, 1, errs.NewScope())
specs, ok := buildNodeSpec(n.GetCoreNode(), tasks, errs)
assert.True(t, ok)
assert.Len(t, specs, 1)
assert.Equal(t, *specs[0].ArrayNode.Parallelism, uint32(10))

n.Node.Target = &core.Node_ArrayNode{
ArrayNode: &core.ArrayNode{
Node: &core.Node{
Id: "foo",
Target: &core.Node_TaskNode{
TaskNode: &core.TaskNode{
Reference: &core.TaskNode_ReferenceId{
ReferenceId: &core.Identifier{Name: "ref_1"},
},
},
},
},
ParallelismOption: nil,
SuccessCriteria: &core.ArrayNode_MinSuccessRatio{
MinSuccessRatio: 0.5,
},
},
}

mustBuild(t, n, 1, errs.NewScope())
specs, ok = buildNodeSpec(n.GetCoreNode(), tasks, errs)
assert.True(t, ok)
assert.Len(t, specs, 1)
assert.Nil(t, specs[0].ArrayNode.Parallelism)
})
}

Expand Down
32 changes: 30 additions & 2 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ var (
},
ClusterID: "propeller",
CreateFlyteWorkflowCRD: false,
ArrayNodeEventVersion: 0,
NodeExecutionWorkerCount: 8,
AcceleratedInputs: AcceleratedInputs{
Enabled: false,
LocalPathPrefix: "/union-persistent-data",
VolumePath: "/mnt/k8s-disks/0/union-persistent-data",
},
ArrayNode: ArrayNodeConfig{
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
}
)

Expand Down Expand Up @@ -168,9 +171,9 @@ type Config struct {
ExcludeDomainLabel []string `json:"exclude-domain-label" pflag:",Exclude the specified domain label from the k8s FlyteWorkflow CRD label selector"`
ClusterID string `json:"cluster-id" pflag:",Unique cluster id running this flytepropeller instance with which to annotate execution events"`
CreateFlyteWorkflowCRD bool `json:"create-flyteworkflow-crd" pflag:",Enable creation of the FlyteWorkflow CRD on startup"`
ArrayNodeEventVersion int `json:"array-node-event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
AcceleratedInputs AcceleratedInputs `json:"accelerated-inputs" pflag:",Accelerated inputs config"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
Expand Down Expand Up @@ -271,6 +274,31 @@ type EventConfig struct {
FallbackToOutputReference bool `json:"fallback-to-output-reference" pflag:",Whether output data should be sent by reference when it is too large to be sent inline in execution events."`
}

// ParallelismBehavior defines how ArrayNode should handle subNode parallelism by default
type ParallelismBehavior = string

const (
// ParallelismBehaviorHybrid means that ArrayNode will adhere to the parallelism defined in the
// ArrayNode exactly. This means `nil` will use the workflow parallelism, and 0 will have
// unlimited parallelism.
ParallelismBehaviorHybrid ParallelismBehavior = "hybrid"

// ParallelismBehaviorUnlimited means that ArrayNode subNodes will be evaluated with unlimited
// parallelism for both nil and 0. If a non-default (ie. nil / zero) parallelism is set, then
// ArrayNode will adhere to that value.
ParallelismBehaviorUnlimited ParallelismBehavior = "unlimited"

// ParallelismBehaviorWorkflow means that ArrayNode subNodes will be evaluated using the
// configured workflow parallelism for both nil and 0. If a non-default (ie. nil / zero)
// parallelism is set, then ArrayNode will adhere to that value.
ParallelismBehaviorWorkflow ParallelismBehavior = "workflow"
)

type ArrayNodeConfig struct {
EventVersion int `json:"event-version" pflag:",ArrayNode eventing version. 0 => legacy (drop-in replacement for maptask), 1 => new"`
DefaultParallelismBehavior ParallelismBehavior `json:"default-parallelism-behavior" pflag:",Default parallelism behavior for array nodes"`
}

// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
Expand Down
3 changes: 2 additions & 1 deletion flytepropeller/pkg/controller/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 91dc63f

Please sign in to comment.